FEI Package Browser (Single Doxygen Collection) Version of the Day
Loading...
Searching...
No Matches
fei_CommUtils.hpp
Go to the documentation of this file.
1
2/*--------------------------------------------------------------------*/
3/* Copyright 2007 Sandia Corporation. */
4/* Under the terms of Contract DE-AC04-94AL85000, there is a */
5/* non-exclusive license for use of this work by or on behalf */
6/* of the U.S. Government. Export of this program may require */
7/* a license from the United States Government. */
8/*--------------------------------------------------------------------*/
9
10#ifndef _fei_CommUtils_hpp_
11#define _fei_CommUtils_hpp_
12
13#include <fei_macros.hpp>
14#include <fei_mpi.h>
15#include <fei_mpiTraits.hpp>
16#include <fei_chk_mpi.hpp>
17#include <fei_iostream.hpp>
18#include <fei_CommMap.hpp>
19#include <fei_TemplateUtils.hpp>
21
22#include <vector>
23#include <set>
24#include <map>
25
26#include <fei_ErrMacros.hpp>
27#undef fei_file
28#define fei_file "fei_CommUtils.hpp"
29
30namespace fei {
31
35int localProc(MPI_Comm comm);
36
40int numProcs(MPI_Comm comm);
41
42void Barrier(MPI_Comm comm);
43
49int mirrorProcs(MPI_Comm comm, std::vector<int>& toProcs, std::vector<int>& fromProcs);
50
52
55int mirrorCommPattern(MPI_Comm comm, comm_map* inPattern, comm_map*& outPattern);
56
61int Allreduce(MPI_Comm comm, bool localBool, bool& globalBool);
62
79 const std::vector<int>& sendProcs,
80 std::vector<int>& sendData,
81 const std::vector<int>& recvProcs,
82 std::vector<int>& recvData);
83
84//----------------------------------------------------------------------------
90template<class T>
91int GlobalMax(MPI_Comm comm, std::vector<T>& local, std::vector<T>& global)
92{
93#ifdef FEI_SER
94 global = local;
95#else
96
97 MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
98
99 try {
100 global.resize(local.size());
101 }
102 catch(std::runtime_error& exc) {
103 fei::console_out() << exc.what()<<FEI_ENDL;
104 return(-1);
105 }
106
107 CHK_MPI( MPI_Allreduce(&(local[0]), &(global[0]),
108 local.size(), mpi_dtype, MPI_MAX, comm) );
109#endif
110
111 return(0);
112}
113
114//----------------------------------------------------------------------------
120template<class T>
121int GlobalMax(MPI_Comm comm, T local, T& global)
122{
123#ifdef FEI_SER
124 global = local;
125#else
126 MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
127
128 CHK_MPI( MPI_Allreduce(&local, &global, 1, mpi_dtype, MPI_MAX, comm) );
129#endif
130 return(0);
131}
132
133//----------------------------------------------------------------------------
139template<class T>
140int GlobalMin(MPI_Comm comm, std::vector<T>& local, std::vector<T>& global)
141{
142#ifdef FEI_SER
143 global = local;
144#else
145
146 MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
147
148 try {
149 global.resize(local.size());
150 }
151 catch(std::runtime_error& exc) {
152 fei::console_out() << exc.what()<<FEI_ENDL;
153 return(-1);
154 }
155
156 CHK_MPI( MPI_Allreduce(&(local[0]), &(global[0]),
157 local.size(), mpi_dtype, MPI_MIN, comm) );
158#endif
159
160 return(0);
161}
162
163//----------------------------------------------------------------------------
169template<class T>
170int GlobalMin(MPI_Comm comm, T local, T& global)
171{
172#ifdef FEI_SER
173 global = local;
174#else
175 MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
176
177 CHK_MPI( MPI_Allreduce(&local, &global, 1, mpi_dtype, MPI_MIN, comm) );
178#endif
179 return(0);
180}
181
182//----------------------------------------------------------------------------
188template<class T>
189int GlobalSum(MPI_Comm comm, std::vector<T>& local, std::vector<T>& global)
190{
191#ifdef FEI_SER
192 global = local;
193#else
194 global.resize(local.size());
195
196 MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
197
198 CHK_MPI( MPI_Allreduce(&(local[0]), &(global[0]),
199 local.size(), mpi_dtype, MPI_SUM, comm) );
200#endif
201 return(0);
202}
203
204//----------------------------------------------------------------------------
206template<class T>
207int GlobalSum(MPI_Comm comm, T local, T& global)
208{
209#ifdef FEI_SER
210 global = local;
211#else
212 MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
213
214 CHK_MPI( MPI_Allreduce(&local, &global, 1, mpi_dtype, MPI_SUM, comm) );
215#endif
216 return(0);
217}
218
219
222template<class T>
224 std::vector<T>& sendbuf,
225 std::vector<int>& recvLengths,
226 std::vector<T>& recvbuf)
227{
228#ifdef FEI_SER
229 //If we're in serial mode, just copy sendbuf to recvbuf and return.
230
231 recvbuf = sendbuf;
232 recvLengths.resize(1);
233 recvLengths[0] = sendbuf.size();
234#else
235 int numProcs = 1;
236 MPI_Comm_size(comm, &numProcs);
237
238 try {
239
240 MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
241
242 std::vector<int> tmpInt(numProcs, 0);
243
244 int len = sendbuf.size();
245 int* tmpBuf = &tmpInt[0];
246
247 recvLengths.resize(numProcs);
248 int* recvLenPtr = &recvLengths[0];
249
250 CHK_MPI( MPI_Allgather(&len, 1, MPI_INT, recvLenPtr, 1, MPI_INT, comm) );
251
252 int displ = 0;
253 for(int i=0; i<numProcs; i++) {
254 tmpBuf[i] = displ;
255 displ += recvLenPtr[i];
256 }
257
258 if (displ == 0) {
259 recvbuf.resize(0);
260 return(0);
261 }
262
263 recvbuf.resize(displ);
264
265 T* sendbufPtr = sendbuf.size()>0 ? &sendbuf[0] : NULL;
266
267 CHK_MPI( MPI_Allgatherv(sendbufPtr, len, mpi_dtype,
268 &recvbuf[0], &recvLengths[0], tmpBuf,
269 mpi_dtype, comm) );
270
271 }
272 catch(std::runtime_error& exc) {
273 fei::console_out() << exc.what() << FEI_ENDL;
274 return(-1);
275 }
276#endif
277
278 return(0);
279}
280
281//------------------------------------------------------------------------
282template<class T>
283int Bcast(MPI_Comm comm, std::vector<T>& sendbuf, int sourceProc)
284{
285#ifndef FEI_SER
286 MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
287
288 CHK_MPI(MPI_Bcast(&sendbuf[0], sendbuf.size(), mpi_dtype,
289 sourceProc, comm) );
290#endif
291 return(0);
292}
293
294//------------------------------------------------------------------------
302template<typename T>
304 const typename CommMap<T>::Type& sendCommMap,
305 typename CommMap<T>::Type& recvCommMap,
306 bool recvProcsKnownOnEntry = false,
307 bool recvLengthsKnownOnEntry = false)
308{
309 if (!recvProcsKnownOnEntry) {
310 recvCommMap.clear();
311 }
312
313#ifndef FEI_SER
314 int tag = 11120;
315 MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
316
317 std::vector<int> sendProcs;
318 fei::copyKeysToVector(sendCommMap, sendProcs);
319 std::vector<int> recvProcs;
320
321 if (recvProcsKnownOnEntry) {
322 fei::copyKeysToVector(recvCommMap, recvProcs);
323 }
324 else {
325 mirrorProcs(comm, sendProcs, recvProcs);
326 for(size_t i=0; i<recvProcs.size(); ++i) {
327 addItemsToCommMap<T>(recvProcs[i], 0, NULL, recvCommMap);
328 }
329 }
330
331 if (!recvLengthsKnownOnEntry) {
332 std::vector<int> tmpIntData(sendProcs.size());
333 std::vector<int> recvLengths(recvProcs.size());
334
336 s_iter = sendCommMap.begin(), s_end = sendCommMap.end();
337
338 for(size_t i=0; s_iter != s_end; ++s_iter, ++i) {
339 tmpIntData[i] = s_iter->second.size();
340 }
341
342 if ( exchangeIntData(comm, sendProcs, tmpIntData, recvProcs, recvLengths) != 0) {
343 return(-1);
344 }
345 for(size_t i=0; i<recvProcs.size(); ++i) {
346 std::vector<T>& rdata = recvCommMap[recvProcs[i]];
347 rdata.resize(recvLengths[i]);
348 }
349 }
350
351 //launch Irecv's for recv-data:
352 std::vector<MPI_Request> mpiReqs;
353 mpiReqs.resize(recvProcs.size());
354
356 r_iter = recvCommMap.begin(), r_end = recvCommMap.end();
357
358 size_t req_offset = 0;
359 for(; r_iter != r_end; ++r_iter) {
360 int rproc = r_iter->first;
361 std::vector<T>& recv_vec = r_iter->second;
362 int len = recv_vec.size();
363 T* recv_buf = len > 0 ? &recv_vec[0] : NULL;
364
365 CHK_MPI( MPI_Irecv(recv_buf, len, mpi_dtype, rproc,
366 tag, comm, &mpiReqs[req_offset++]) );
367 }
368
369 //send the send-data:
370
372 s_iter = sendCommMap.begin(), s_end = sendCommMap.end();
373
374 for(; s_iter != s_end; ++s_iter) {
375 int sproc = s_iter->first;
376 const std::vector<T>& send_vec = s_iter->second;
377 int len = send_vec.size();
378 T* send_buf = len>0 ? const_cast<T*>(&send_vec[0]) : NULL;
379
380 CHK_MPI( MPI_Send(send_buf, len, mpi_dtype, sproc, tag, comm) );
381 }
382
383 //complete the Irecvs:
384 for(size_t i=0; i<mpiReqs.size(); ++i) {
385 int index;
386 MPI_Status status;
387 CHK_MPI( MPI_Waitany(mpiReqs.size(), &mpiReqs[0], &index, &status) );
388 }
389
390#endif
391 return(0);
392}
393
394
395//------------------------------------------------------------------------
396template<class T>
398 std::vector<int>& sendProcs,
399 std::vector<std::vector<T> >& sendData,
400 std::vector<int>& recvProcs,
401 bool recvDataLengthsKnownOnEntry,
402 std::vector<std::vector<T> >& recvData)
403{
404 if (sendProcs.size() == 0 && recvProcs.size() == 0) return(0);
405 if (sendProcs.size() != sendData.size()) return(-1);
406#ifndef FEI_SER
407 std::vector<MPI_Request> mpiReqs;
408 mpiReqs.resize(recvProcs.size());
409
410 int tag = 11119;
411 MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
412
413 if (!recvDataLengthsKnownOnEntry) {
414 std::vector<int> tmpIntData(sendData.size());
415 std::vector<int> recvLengths(recvProcs.size());
416 for(unsigned i=0; i<sendData.size(); ++i) {
417 tmpIntData[i] = sendData[i].size();
418 }
419
420 if ( exchangeIntData(comm, sendProcs, tmpIntData, recvProcs, recvLengths) != 0) {
421 return(-1);
422 }
423 for(unsigned i=0; i<recvProcs.size(); ++i) {
424 recvData[i].resize(recvLengths[i]);
425 }
426 }
427
428 //launch Irecv's for recvData:
429
430 size_t numRecvProcs = recvProcs.size();
431 int req_offset = 0;
432 int localProc = fei::localProc(comm);
433 for(size_t i=0; i<recvProcs.size(); ++i) {
434 if (recvProcs[i] == localProc) {--numRecvProcs; continue; }
435
436 int len = recvData[i].size();
437 std::vector<T>& recv_vec = recvData[i];
438 T* recv_buf = len > 0 ? &recv_vec[0] : NULL;
439
440 CHK_MPI( MPI_Irecv(recv_buf, len, mpi_dtype, recvProcs[i],
441 tag, comm, &mpiReqs[req_offset++]) );
442 }
443
444 //send the sendData:
445
446 for(size_t i=0; i<sendProcs.size(); ++i) {
447 if (sendProcs[i] == localProc) continue;
448
449 std::vector<T>& send_buf = sendData[i];
450 CHK_MPI( MPI_Send(&send_buf[0], sendData[i].size(), mpi_dtype,
451 sendProcs[i], tag, comm) );
452 }
453
454 //complete the Irecvs:
455 for(size_t i=0; i<numRecvProcs; ++i) {
456 if (recvProcs[i] == localProc) continue;
457 int index;
458 MPI_Status status;
459 CHK_MPI( MPI_Waitany(numRecvProcs, &mpiReqs[0], &index, &status) );
460 }
461
462#endif
463 return(0);
464}
465
466//------------------------------------------------------------------------
467template<class T>
469 std::vector<int>& sendProcs,
470 std::vector<std::vector<T>*>& sendData,
471 std::vector<int>& recvProcs,
472 bool recvLengthsKnownOnEntry,
473 std::vector<std::vector<T>*>& recvData)
474{
475 if (sendProcs.size() == 0 && recvProcs.size() == 0) return(0);
476 if (sendProcs.size() != sendData.size()) return(-1);
477#ifndef FEI_SER
478 int tag = 11115;
479 MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
480 std::vector<MPI_Request> mpiReqs;
481
482 try {
483 mpiReqs.resize(recvProcs.size());
484
485 if (!recvLengthsKnownOnEntry) {
486 std::vector<int> tmpIntData;
487 tmpIntData.resize(sendData.size());
488 std::vector<int> recvLens(sendData.size());
489 for(unsigned i=0; i<sendData.size(); ++i) {
490 tmpIntData[i] = (int)sendData[i]->size();
491 }
492
493 if (exchangeIntData(comm, sendProcs, tmpIntData, recvProcs, recvLens) != 0) {
494 return(-1);
495 }
496
497 for(unsigned i=0; i<recvLens.size(); ++i) {
498 recvData[i]->resize(recvLens[i]);
499 }
500 }
501 }
502 catch(std::runtime_error& exc) {
503 fei::console_out() << exc.what() << FEI_ENDL;
504 return(-1);
505 }
506
507 //launch Irecv's for recvData:
508
509 size_t numRecvProcs = recvProcs.size();
510 int req_offset = 0;
511 int localProc = fei::localProc(comm);
512 for(unsigned i=0; i<recvProcs.size(); ++i) {
513 if (recvProcs[i] == localProc) {--numRecvProcs; continue;}
514
515 size_t len = recvData[i]->size();
516 std::vector<T>& rbuf = *recvData[i];
517
518 CHK_MPI( MPI_Irecv(&rbuf[0], (int)len, mpi_dtype,
519 recvProcs[i], tag, comm, &mpiReqs[req_offset++]) );
520 }
521
522 //send the sendData:
523
524 for(unsigned i=0; i<sendProcs.size(); ++i) {
525 if (sendProcs[i] == localProc) continue;
526
527 std::vector<T>& sbuf = *sendData[i];
528 CHK_MPI( MPI_Send(&sbuf[0], (int)sbuf.size(), mpi_dtype,
529 sendProcs[i], tag, comm) );
530 }
531
532 //complete the Irecv's:
533 for(unsigned i=0; i<numRecvProcs; ++i) {
534 if (recvProcs[i] == localProc) continue;
535 int index;
536 MPI_Status status;
537 CHK_MPI( MPI_Waitany((int)numRecvProcs, &mpiReqs[0], &index, &status) );
538 }
539
540#endif
541 return(0);
542}
543
544
545//------------------------------------------------------------------------
563template<class T>
565public:
566 virtual ~MessageHandler(){}
567
571 virtual std::vector<int>& getSendProcs() = 0;
572
576 virtual std::vector<int>& getRecvProcs() = 0;
577
581 virtual int getSendMessageLength(int destProc, int& messageLength) = 0;
582
586 virtual int getSendMessage(int destProc, std::vector<T>& message) = 0;
587
591 virtual int processRecvMessage(int srcProc, std::vector<T>& message) = 0;
592};//class MessageHandler
593
594
595//------------------------------------------------------------------------
596template<class T>
597int exchange(MPI_Comm comm, MessageHandler<T>* msgHandler)
598{
599#ifdef FEI_SER
600 (void)msgHandler;
601#else
602 int numProcs = fei::numProcs(comm);
603 if (numProcs < 2) return(0);
604
605 std::vector<int>& sendProcs = msgHandler->getSendProcs();
606 int numSendProcs = sendProcs.size();
607 std::vector<int>& recvProcs = msgHandler->getRecvProcs();
608 int numRecvProcs = recvProcs.size();
609 int i;
610
611 if (numSendProcs < 1 && numRecvProcs < 1) {
612 return(0);
613 }
614
615 std::vector<int> sendMsgLengths(numSendProcs);
616
617 for(i=0; i<numSendProcs; ++i) {
618 CHK_ERR( msgHandler->getSendMessageLength(sendProcs[i], sendMsgLengths[i]) );
619 }
620
621 std::vector<std::vector<T> > recvMsgs(numRecvProcs);
622
623 std::vector<std::vector<T> > sendMsgs(numSendProcs);
624 for(i=0; i<numSendProcs; ++i) {
625 CHK_ERR( msgHandler->getSendMessage(sendProcs[i], sendMsgs[i]) );
626 }
627
628 CHK_ERR( exchangeData(comm, sendProcs, sendMsgs,
629 recvProcs, false, recvMsgs) );
630
631 for(i=0; i<numRecvProcs; ++i) {
632 std::vector<T>& recvMsg = recvMsgs[i];
633 CHK_ERR( msgHandler->processRecvMessage(recvProcs[i], recvMsg ) );
634 }
635#endif
636
637 return(0);
638}
639
640
641} //namespace fei
642
643#endif // _fei_CommUtils_hpp_
644
virtual int processRecvMessage(int srcProc, std::vector< T > &message)=0
virtual std::vector< int > & getSendProcs()=0
virtual std::vector< int > & getRecvProcs()=0
virtual int getSendMessage(int destProc, std::vector< T > &message)=0
virtual int getSendMessageLength(int destProc, int &messageLength)=0
#define CHK_ERR(a)
#define CHK_MPI(a)
#define FEI_ENDL
#define MPI_Comm
Definition fei_mpi.h:56
int localProc(MPI_Comm comm)
int GlobalMin(MPI_Comm comm, std::vector< T > &local, std::vector< T > &global)
void copyKeysToVector(const MAP_TYPE &map_obj, std::vector< int > &keyvector)
int mirrorCommPattern(MPI_Comm comm, comm_map *inPattern, comm_map *&outPattern)
int mirrorProcs(MPI_Comm comm, std::vector< int > &toProcs, std::vector< int > &fromProcs)
int exchange(MPI_Comm comm, MessageHandler< T > *msgHandler)
int exchangeCommMapData(MPI_Comm comm, const typename CommMap< T >::Type &sendCommMap, typename CommMap< T >::Type &recvCommMap, bool recvProcsKnownOnEntry=false, bool recvLengthsKnownOnEntry=false)
int exchangeIntData(MPI_Comm comm, const std::vector< int > &sendProcs, std::vector< int > &sendData, const std::vector< int > &recvProcs, std::vector< int > &recvData)
int Allgatherv(MPI_Comm comm, std::vector< T > &sendbuf, std::vector< int > &recvLengths, std::vector< T > &recvbuf)
int Bcast(MPI_Comm comm, std::vector< T > &sendbuf, int sourceProc)
snl_fei::RaggedTable< std::map< int, std::set< int > * >, std::set< int > > comm_map
std::ostream & console_out()
int Allreduce(MPI_Comm comm, bool localBool, bool &globalBool)
int GlobalMax(MPI_Comm comm, std::vector< T > &local, std::vector< T > &global)
int numProcs(MPI_Comm comm)
int GlobalSum(MPI_Comm comm, std::vector< T > &local, std::vector< T > &global)
void Barrier(MPI_Comm comm)
int exchangeData(MPI_Comm comm, std::vector< int > &sendProcs, std::vector< std::vector< T > > &sendData, std::vector< int > &recvProcs, bool recvDataLengthsKnownOnEntry, std::vector< std::vector< T > > &recvData)
std::map< int, std::vector< T > > Type