39#include "Tpetra_Details_DistributorPlan.hpp"
41#include "Teuchos_StandardParameterEntryValidators.hpp"
55 else if (
sendType == DISTRIBUTOR_SEND) {
58 else if (
sendType == DISTRIBUTOR_ALLTOALL) {
63 "EDistributorSendType enum value " <<
sendType <<
".");
71 case Details::DISTRIBUTOR_NOT_INITIALIZED:
72 return "Not initialized yet";
73 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
74 return "By createFromSends";
75 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
76 return "By createFromRecvs";
77 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS:
78 return "By createFromSendsAndRecvs";
79 case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
80 return "By createReverseDistributor";
81 case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
82 return "By copy constructor";
88DistributorPlan::DistributorPlan(Teuchos::RCP<
const Teuchos::Comm<int>> comm)
90 howInitialized_(DISTRIBUTOR_NOT_INITIALIZED),
91 reversePlan_(Teuchos::
null),
92 sendType_(DISTRIBUTOR_SEND),
93 sendMessageToSelf_(
false),
94 numSendsToOtherProcs_(0),
97 totalReceiveLength_(0)
100DistributorPlan::DistributorPlan(
const DistributorPlan& otherPlan)
101 : comm_(otherPlan.comm_),
102 howInitialized_(DISTRIBUTOR_INITIALIZED_BY_COPY),
103 reversePlan_(otherPlan.reversePlan_),
104 sendType_(otherPlan.sendType_),
105 sendMessageToSelf_(otherPlan.sendMessageToSelf_),
106 numSendsToOtherProcs_(otherPlan.numSendsToOtherProcs_),
107 procIdsToSendTo_(otherPlan.procIdsToSendTo_),
108 startsTo_(otherPlan.startsTo_),
109 lengthsTo_(otherPlan.lengthsTo_),
110 maxSendLength_(otherPlan.maxSendLength_),
111 indicesTo_(otherPlan.indicesTo_),
112 numReceives_(otherPlan.numReceives_),
113 totalReceiveLength_(otherPlan.totalReceiveLength_),
114 lengthsFrom_(otherPlan.lengthsFrom_),
115 procsFrom_(otherPlan.procsFrom_),
116 startsFrom_(otherPlan.startsFrom_),
117 indicesFrom_(otherPlan.indicesFrom_)
120size_t DistributorPlan::createFromSends(
const Teuchos::ArrayView<const int>& exportProcIDs) {
121 using Teuchos::outArg;
122 using Teuchos::REDUCE_MAX;
123 using Teuchos::reduceAll;
125 const char rawPrefix[] =
"Tpetra::DistributorPlan::createFromSends";
127 const size_t numExports = exportProcIDs.size();
128 const int myProcID = comm_->getRank();
129 const int numProcs = comm_->getSize();
173 for (
size_t i = 0; i < numExports; ++i) {
174 const int exportID = exportProcIDs[i];
175 if (exportID >= numProcs || exportID < 0) {
181 reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
182 TEUCHOS_TEST_FOR_EXCEPTION
183 (gbl_badID >= 0, std::runtime_error, rawPrefix <<
"Proc "
184 << gbl_badID <<
", perhaps among other processes, got a bad "
200 Teuchos::Array<size_t> starts (numProcs + 1, 0);
203 size_t numActive = 0;
204 int needSendBuff = 0;
206 for (
size_t i = 0; i < numExports; ++i) {
207 const int exportID = exportProcIDs[i];
222 if (needSendBuff == 0 && starts[exportID] > 1 &&
223 exportID != exportProcIDs[i-1]) {
230#if defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
232 int global_needSendBuff;
233 reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
234 outArg (global_needSendBuff));
236 global_needSendBuff != 0,
237 "::createFromSends: Grouping export IDs together by process rank often "
238 "improves performance.");
244 if (starts[myProcID] != 0) {
245 sendMessageToSelf_ =
true;
248 sendMessageToSelf_ =
false;
251 if (! needSendBuff) {
253 numSendsToOtherProcs_ = 0;
256 for (
int i = 0; i < numProcs; ++i) {
258 ++numSendsToOtherProcs_;
264 indicesTo_.resize(0);
267 procIdsToSendTo_.assign(numSendsToOtherProcs_,0);
268 startsTo_.assign(numSendsToOtherProcs_,0);
269 lengthsTo_.assign(numSendsToOtherProcs_,0);
276 size_t procIndex = 0;
277 for (
size_t i = 0; i < numSendsToOtherProcs_; ++i) {
278 while (exportProcIDs[procIndex] < 0) {
281 startsTo_[i] = procIndex;
282 int procID = exportProcIDs[procIndex];
283 procIdsToSendTo_[i] = procID;
284 procIndex += starts[procID];
289 if (numSendsToOtherProcs_ > 0) {
290 sort2(procIdsToSendTo_.begin(), procIdsToSendTo_.end(), startsTo_.begin());
294 for (
size_t i = 0; i < numSendsToOtherProcs_; ++i) {
295 int procID = procIdsToSendTo_[i];
296 lengthsTo_[i] = starts[procID];
297 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
298 maxSendLength_ = lengthsTo_[i];
309 if (starts[0] == 0 ) {
310 numSendsToOtherProcs_ = 0;
313 numSendsToOtherProcs_ = 1;
315 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
317 i != starts.end(); ++i)
319 if (*i != 0) ++numSendsToOtherProcs_;
325 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
327 i != starts.rend(); ++i)
336 indicesTo_.resize(numActive);
338 for (
size_t i = 0; i < numExports; ++i) {
339 if (exportProcIDs[i] >= 0) {
341 indicesTo_[starts[exportProcIDs[i]]] = i;
343 ++starts[exportProcIDs[i]];
355 for (
int proc = numProcs-1; proc != 0; --proc) {
356 starts[proc] = starts[proc-1];
359 starts[numProcs] = numActive;
366 procIdsToSendTo_.resize(numSendsToOtherProcs_);
367 startsTo_.resize(numSendsToOtherProcs_);
368 lengthsTo_.resize(numSendsToOtherProcs_);
375 for (
int proc = 0; proc < numProcs; ++proc ) {
376 if (starts[proc+1] != starts[proc]) {
377 lengthsTo_[snd] = starts[proc+1] - starts[proc];
378 startsTo_[snd] = starts[proc];
380 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
381 maxSendLength_ = lengthsTo_[snd];
383 procIdsToSendTo_[snd] = proc;
389 if (sendMessageToSelf_) {
390 --numSendsToOtherProcs_;
398 howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
400 return totalReceiveLength_;
403void DistributorPlan::createFromRecvs(
const Teuchos::ArrayView<const int>& remoteProcIDs)
405 createFromSends(remoteProcIDs);
407 *
this = *getReversePlan();
409 howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS;
412void DistributorPlan::createFromSendsAndRecvs(
const Teuchos::ArrayView<const int>& exportProcIDs,
413 const Teuchos::ArrayView<const int>& remoteProcIDs)
422 howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
425 int myProcID = comm_->getRank ();
426 int numProcs = comm_->getSize();
428 const size_t numExportIDs = exportProcIDs.size();
429 Teuchos::Array<size_t> starts (numProcs + 1, 0);
431 size_t numActive = 0;
432 int needSendBuff = 0;
434 for(
size_t i = 0; i < numExportIDs; i++ )
436 if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
438 if( exportProcIDs[i] >= 0 )
440 ++starts[ exportProcIDs[i] ];
445 sendMessageToSelf_ = ( starts[myProcID] != 0 ) ? 1 : 0;
447 numSendsToOtherProcs_ = 0;
451 if (starts[0] == 0 ) {
452 numSendsToOtherProcs_ = 0;
455 numSendsToOtherProcs_ = 1;
457 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
459 i != starts.end(); ++i)
461 if (*i != 0) ++numSendsToOtherProcs_;
467 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
469 i != starts.rend(); ++i)
478 indicesTo_.resize(numActive);
480 for (
size_t i = 0; i < numExportIDs; ++i) {
481 if (exportProcIDs[i] >= 0) {
483 indicesTo_[starts[exportProcIDs[i]]] = i;
485 ++starts[exportProcIDs[i]];
488 for (
int proc = numProcs-1; proc != 0; --proc) {
489 starts[proc] = starts[proc-1];
492 starts[numProcs] = numActive;
493 procIdsToSendTo_.resize(numSendsToOtherProcs_);
494 startsTo_.resize(numSendsToOtherProcs_);
495 lengthsTo_.resize(numSendsToOtherProcs_);
498 for (
int proc = 0; proc < numProcs; ++proc ) {
499 if (starts[proc+1] != starts[proc]) {
500 lengthsTo_[snd] = starts[proc+1] - starts[proc];
501 startsTo_[snd] = starts[proc];
503 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
504 maxSendLength_ = lengthsTo_[snd];
506 procIdsToSendTo_[snd] = proc;
513 numSendsToOtherProcs_ = 0;
516 for (
int i = 0; i < numProcs; ++i) {
518 ++numSendsToOtherProcs_;
524 indicesTo_.resize(0);
527 procIdsToSendTo_.assign(numSendsToOtherProcs_,0);
528 startsTo_.assign(numSendsToOtherProcs_,0);
529 lengthsTo_.assign(numSendsToOtherProcs_,0);
536 size_t procIndex = 0;
537 for (
size_t i = 0; i < numSendsToOtherProcs_; ++i) {
538 while (exportProcIDs[procIndex] < 0) {
541 startsTo_[i] = procIndex;
542 int procID = exportProcIDs[procIndex];
543 procIdsToSendTo_[i] = procID;
544 procIndex += starts[procID];
549 if (numSendsToOtherProcs_ > 0) {
550 sort2(procIdsToSendTo_.begin(), procIdsToSendTo_.end(), startsTo_.begin());
554 for (
size_t i = 0; i < numSendsToOtherProcs_; ++i) {
555 int procID = procIdsToSendTo_[i];
556 lengthsTo_[i] = starts[procID];
557 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
558 maxSendLength_ = lengthsTo_[i];
564 numSendsToOtherProcs_ -= sendMessageToSelf_;
565 std::vector<int> recv_list;
566 recv_list.reserve(numSendsToOtherProcs_);
569 for(
int i=0; i<remoteProcIDs.size(); i++) {
570 if(remoteProcIDs[i]>last_pid) {
571 recv_list.push_back(remoteProcIDs[i]);
572 last_pid = remoteProcIDs[i];
574 else if (remoteProcIDs[i]<last_pid)
575 throw std::runtime_error(
"Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
577 numReceives_ = recv_list.size();
579 procsFrom_.assign(numReceives_,0);
580 lengthsFrom_.assign(numReceives_,0);
581 indicesFrom_.assign(numReceives_,0);
582 startsFrom_.assign(numReceives_,0);
584 for(
size_t i=0,j=0; i<numReceives_; ++i) {
586 procsFrom_[i] = recv_list[i];
588 for( ; j<(size_t)remoteProcIDs.size() &&
589 remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
590 lengthsFrom_[i] = j-jlast;
592 totalReceiveLength_ = remoteProcIDs.size();
593 indicesFrom_.clear ();
594 numReceives_-=sendMessageToSelf_;
597Teuchos::RCP<DistributorPlan> DistributorPlan::getReversePlan()
const {
598 if (reversePlan_.is_null()) createReversePlan();
602void DistributorPlan::createReversePlan()
const
604 reversePlan_ = Teuchos::rcp(
new DistributorPlan(comm_));
605 reversePlan_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
606 reversePlan_->sendType_ = sendType_;
611 size_t totalSendLength =
612 std::accumulate(lengthsTo_.begin(), lengthsTo_.end(), 0);
617 size_t maxReceiveLength = 0;
618 const int myProcID = comm_->getRank();
619 for (
size_t i=0; i < numReceives_; ++i) {
620 if (procsFrom_[i] != myProcID) {
622 if (lengthsFrom_[i] > maxReceiveLength) {
623 maxReceiveLength = lengthsFrom_[i];
628 reversePlan_->sendMessageToSelf_ = sendMessageToSelf_;
629 reversePlan_->numSendsToOtherProcs_ = numReceives_;
630 reversePlan_->procIdsToSendTo_ = procsFrom_;
631 reversePlan_->startsTo_ = startsFrom_;
632 reversePlan_->lengthsTo_ = lengthsFrom_;
633 reversePlan_->maxSendLength_ = maxReceiveLength;
634 reversePlan_->indicesTo_ = indicesFrom_;
635 reversePlan_->numReceives_ = numSendsToOtherProcs_;
636 reversePlan_->totalReceiveLength_ = totalSendLength;
637 reversePlan_->lengthsFrom_ = lengthsTo_;
638 reversePlan_->procsFrom_ = procIdsToSendTo_;
639 reversePlan_->startsFrom_ = startsTo_;
640 reversePlan_->indicesFrom_ = indicesTo_;
643void DistributorPlan::computeReceives()
645 using Teuchos::Array;
646 using Teuchos::ArrayRCP;
648 using Teuchos::CommStatus;
649 using Teuchos::CommRequest;
650 using Teuchos::ireceive;
653 using Teuchos::REDUCE_SUM;
654 using Teuchos::receive;
655 using Teuchos::reduce;
656 using Teuchos::scatter;
658 using Teuchos::waitAll;
660 const int myRank = comm_->getRank();
661 const int numProcs = comm_->getSize();
663 const int mpiTag = DEFAULT_MPI_TAG;
671 Array<int> toProcsFromMe (numProcs, 0);
672#ifdef HAVE_TPETRA_DEBUG
673 bool counting_error =
false;
675 for (
size_t i = 0; i < (numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0)); ++i) {
676#ifdef HAVE_TPETRA_DEBUG
677 if (toProcsFromMe[procIdsToSendTo_[i]] != 0) {
678 counting_error =
true;
681 toProcsFromMe[procIdsToSendTo_[i]] = 1;
683#ifdef HAVE_TPETRA_DEBUG
686 "Tpetra::Distributor::computeReceives: There was an error on at least "
687 "one process in counting the number of messages send by that process to "
688 "the other processs. Please report this bug to the Tpetra developers.",
745 Array<int> numRecvsOnEachProc;
746 if (myRank == root) {
747 numRecvsOnEachProc.resize (numProcs);
749 int numReceivesAsInt = 0;
750 reduce<int, int> (toProcsFromMe.getRawPtr (),
751 numRecvsOnEachProc.getRawPtr (),
752 numProcs, REDUCE_SUM, root, *comm_);
753 scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
754 &numReceivesAsInt, 1, root, *comm_);
755 numReceives_ =
static_cast<size_t> (numReceivesAsInt);
761 lengthsFrom_.assign (numReceives_, 0);
762 procsFrom_.assign (numReceives_, 0);
778 const size_t actualNumReceives = numReceives_ - (sendMessageToSelf_ ? 1 : 0);
784 Array<RCP<CommRequest<int> > > requests (actualNumReceives);
785 Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
786 Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
791 const int anySourceProc = MPI_ANY_SOURCE;
793 const int anySourceProc = -1;
797 for (
size_t i = 0; i < actualNumReceives; ++i) {
802 lengthsFromBuffers[i].resize (1);
803 lengthsFromBuffers[i][0] = as<size_t> (0);
804 requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc,
816 for (
size_t i = 0; i < numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0); ++i) {
817 if (procIdsToSendTo_[i] != myRank) {
821 const size_t*
const lengthsTo_i = &lengthsTo_[i];
822 send<int, size_t> (lengthsTo_i, 1, as<int> (procIdsToSendTo_[i]), mpiTag, *comm_);
831 lengthsFrom_[numReceives_-1] = lengthsTo_[i];
832 procsFrom_[numReceives_-1] = myRank;
842 waitAll (*comm_, requests (), statuses ());
843 for (
size_t i = 0; i < actualNumReceives; ++i) {
844 lengthsFrom_[i] = *lengthsFromBuffers[i];
845 procsFrom_[i] = statuses[i]->getSourceRank ();
851 sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
854 totalReceiveLength_ =
855 std::accumulate (lengthsFrom_.begin (), lengthsFrom_.end (), 0);
856 indicesFrom_.clear ();
858 startsFrom_.clear ();
859 startsFrom_.reserve (numReceives_);
860 for (
size_t i = 0, j = 0; i < numReceives_; ++i) {
861 startsFrom_.push_back(j);
862 j += lengthsFrom_[i];
865 if (sendMessageToSelf_) {
870void DistributorPlan::setParameterList(
const Teuchos::RCP<Teuchos::ParameterList>& plist)
872 using Teuchos::FancyOStream;
873 using Teuchos::getIntegralValue;
874 using Teuchos::ParameterList;
875 using Teuchos::parameterList;
879 if (! plist.is_null()) {
880 RCP<const ParameterList> validParams = getValidParameters ();
881 plist->validateParametersAndSetDefaults (*validParams);
884 getIntegralValue<Details::EDistributorSendType> (*plist,
"Send type");
887 sendType_ = sendType;
891 this->setMyParamList (plist);
897 Teuchos::Array<std::string> sendTypes;
898 sendTypes.push_back (
"Isend");
899 sendTypes.push_back (
"Send");
900 sendTypes.push_back (
"Alltoall");
904Teuchos::RCP<const Teuchos::ParameterList>
905DistributorPlan::getValidParameters()
const
907 using Teuchos::Array;
908 using Teuchos::ParameterList;
909 using Teuchos::parameterList;
911 using Teuchos::setStringToIntegralParameter;
914 const std::string defaultSendType (
"Send");
915 Array<Details::EDistributorSendType> sendTypeEnums;
916 sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
917 sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
918 sendTypeEnums.push_back (Details::DISTRIBUTOR_ALLTOALL);
920 RCP<ParameterList> plist = parameterList (
"Tpetra::Distributor");
922 setStringToIntegralParameter<Details::EDistributorSendType> (
"Send type",
923 defaultSendType,
"When using MPI, the variant of send to use in "
924 "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
925 plist->set (
"Timer Label",
"",
"Label for Time Monitor output");
927 return Teuchos::rcp_const_cast<const ParameterList> (plist);
Declaration of Tpetra::Details::Behavior, a class that describes Tpetra's behavior.
Stand-alone utility functions and macros.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, msg)
Print or throw an efficency warning.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
Struct that holds views of the contents of a CrsMatrix.
static bool debug()
Whether Tpetra is in debug mode.
Implementation details of Tpetra.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
EDistributorSendType
The type of MPI send that Distributor should use.
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
Namespace Tpetra contains the class and methods constituting the Tpetra library.
Teuchos::Array< std::string > distributorSendTypes()
Valid values for Distributor's "Send type" parameter.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2)
Sort the first array, and apply the resulting permutation to the second array.