Tpetra parallel linear algebra Version of the Day
Loading...
Searching...
No Matches
Tpetra_Details_DistributorPlan.cpp
1// ***********************************************************************
2//
3// Tpetra: Templated Linear Algebra Services Package
4// Copyright (2008) Sandia Corporation
5//
6// Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
7// the U.S. Government retains certain rights in this software.
8//
9// Redistribution and use in source and binary forms, with or without
10// modification, are permitted provided that the following conditions are
11// met:
12//
13// 1. Redistributions of source code must retain the above copyright
14// notice, this list of conditions and the following disclaimer.
15//
16// 2. Redistributions in binary form must reproduce the above copyright
17// notice, this list of conditions and the following disclaimer in the
18// documentation and/or other materials provided with the distribution.
19//
20// 3. Neither the name of the Corporation nor the names of the
21// contributors may be used to endorse or promote products derived from
22// this software without specific prior written permission.
23//
24// THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
25// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
27// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
28// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
29// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
30// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
31// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
32// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
33// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
34// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35//
36// ************************************************************************
37// @HEADER
38
39#include "Tpetra_Details_DistributorPlan.hpp"
40
41#include "Teuchos_StandardParameterEntryValidators.hpp"
42#include "Tpetra_Util.hpp"
44#include <numeric>
45
46namespace Tpetra {
47namespace Details {
48
49std::string
51{
52 if (sendType == DISTRIBUTOR_ISEND) {
53 return "Isend";
54 }
55 else if (sendType == DISTRIBUTOR_SEND) {
56 return "Send";
57 }
58 else if (sendType == DISTRIBUTOR_ALLTOALL) {
59 return "Alltoall";
60 }
61 else {
62 TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid "
63 "EDistributorSendType enum value " << sendType << ".");
64 }
65}
66
67std::string
69{
70 switch (how) {
71 case Details::DISTRIBUTOR_NOT_INITIALIZED:
72 return "Not initialized yet";
73 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
74 return "By createFromSends";
75 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
76 return "By createFromRecvs";
77 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS:
78 return "By createFromSendsAndRecvs";
79 case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
80 return "By createReverseDistributor";
81 case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
82 return "By copy constructor";
83 default:
84 return "INVALID";
85 }
86}
87
88DistributorPlan::DistributorPlan(Teuchos::RCP<const Teuchos::Comm<int>> comm)
89 : comm_(comm),
90 howInitialized_(DISTRIBUTOR_NOT_INITIALIZED),
91 reversePlan_(Teuchos::null),
92 sendType_(DISTRIBUTOR_SEND),
93 sendMessageToSelf_(false),
94 numSendsToOtherProcs_(0),
95 maxSendLength_(0),
96 numReceives_(0),
97 totalReceiveLength_(0)
98{ }
99
100DistributorPlan::DistributorPlan(const DistributorPlan& otherPlan)
101 : comm_(otherPlan.comm_),
102 howInitialized_(DISTRIBUTOR_INITIALIZED_BY_COPY),
103 reversePlan_(otherPlan.reversePlan_),
104 sendType_(otherPlan.sendType_),
105 sendMessageToSelf_(otherPlan.sendMessageToSelf_),
106 numSendsToOtherProcs_(otherPlan.numSendsToOtherProcs_),
107 procIdsToSendTo_(otherPlan.procIdsToSendTo_),
108 startsTo_(otherPlan.startsTo_),
109 lengthsTo_(otherPlan.lengthsTo_),
110 maxSendLength_(otherPlan.maxSendLength_),
111 indicesTo_(otherPlan.indicesTo_),
112 numReceives_(otherPlan.numReceives_),
113 totalReceiveLength_(otherPlan.totalReceiveLength_),
114 lengthsFrom_(otherPlan.lengthsFrom_),
115 procsFrom_(otherPlan.procsFrom_),
116 startsFrom_(otherPlan.startsFrom_),
117 indicesFrom_(otherPlan.indicesFrom_)
118{ }
119
120size_t DistributorPlan::createFromSends(const Teuchos::ArrayView<const int>& exportProcIDs) {
121 using Teuchos::outArg;
122 using Teuchos::REDUCE_MAX;
123 using Teuchos::reduceAll;
124 using std::endl;
125 const char rawPrefix[] = "Tpetra::DistributorPlan::createFromSends";
126
127 const size_t numExports = exportProcIDs.size();
128 const int myProcID = comm_->getRank();
129 const int numProcs = comm_->getSize();
130 const bool debug = Details::Behavior::debug("Distributor");
131
132 // exportProcIDs tells us the communication pattern for this
133 // distributor. It dictates the way that the export data will be
134 // interpreted in doPosts(). We want to perform at most one
135 // send per process in doPosts; this is for two reasons:
136 // * minimize latency / overhead in the comm routines (nice)
137 // * match the number of receives and sends between processes
138 // (necessary)
139 //
140 // Teuchos::Comm requires that the data for a send are contiguous
141 // in a send buffer. Therefore, if the data in the send buffer
142 // for doPosts() are not contiguous, they will need to be copied
143 // into a contiguous buffer. The user has specified this
144 // noncontiguous pattern and we can't do anything about it.
145 // However, if they do not provide an efficient pattern, we will
146 // warn them if one of the following compile-time options has been
147 // set:
148 // * HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS
149 //
150 // If the data are contiguous, then we can post the sends in situ
151 // (i.e., without needing to copy them into a send buffer).
152 //
153 // Determine contiguity. There are a number of ways to do this:
154 // * If the export IDs are sorted, then all exports to a
155 // particular proc must be contiguous. This is what Epetra does.
156 // * If the export ID of the current export already has been
157 // listed, then the previous listing should correspond to the
158 // same export. This tests contiguity, but not sortedness.
159 //
160 // Both of these tests require O(n), where n is the number of
161 // exports. However, the latter will positively identify a greater
162 // portion of contiguous patterns. We use the latter method.
163 //
164 // Check to see if values are grouped by procs without gaps
165 // If so, indices_to -> 0.
166
167 if (debug) {
168 // Test whether any process in the communicator got an invalid
169 // process ID. If badID != -1 on this process, then it equals
170 // this process' rank. The max of all badID over all processes
171 // is the max rank which has an invalid process ID.
172 int badID = -1;
173 for (size_t i = 0; i < numExports; ++i) {
174 const int exportID = exportProcIDs[i];
175 if (exportID >= numProcs || exportID < 0) {
176 badID = myProcID;
177 break;
178 }
179 }
180 int gbl_badID;
181 reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
182 TEUCHOS_TEST_FOR_EXCEPTION
183 (gbl_badID >= 0, std::runtime_error, rawPrefix << "Proc "
184 << gbl_badID << ", perhaps among other processes, got a bad "
185 "send process ID.");
186 }
187
188 // Set up data structures for quick traversal of arrays.
189 // This contains the number of sends for each process ID.
190 //
191 // FIXME (mfh 20 Mar 2014) This is one of a few places in Tpetra
192 // that create an array of length the number of processes in the
193 // communicator (plus one). Given how this code uses this array,
194 // it should be straightforward to replace it with a hash table or
195 // some other more space-efficient data structure. In practice,
196 // most of the entries of starts should be zero for a sufficiently
197 // large process count, unless the communication pattern is dense.
198 // Note that it's important to be able to iterate through keys (i
199 // for which starts[i] is nonzero) in increasing order.
200 Teuchos::Array<size_t> starts (numProcs + 1, 0);
201
202 // numActive is the number of sends that are not Null
203 size_t numActive = 0;
204 int needSendBuff = 0; // Boolean
205
206 for (size_t i = 0; i < numExports; ++i) {
207 const int exportID = exportProcIDs[i];
208 if (exportID >= 0) {
209 // exportID is a valid process ID. Increment the number of
210 // messages this process will send to that process.
211 ++starts[exportID];
212
213 // If we're sending more than one message to process exportID,
214 // then it is possible that the data are not contiguous.
215 // Check by seeing if the previous process ID in the list
216 // (exportProcIDs[i-1]) is the same. It's safe to use i-1,
217 // because if starts[exportID] > 1, then i must be > 1 (since
218 // the starts array was filled with zeros initially).
219
220 // null entries break continuity.
221 // e.g., [ 0, 0, 0, 1, -99, 1, 2, 2, 2] is not contiguous
222 if (needSendBuff == 0 && starts[exportID] > 1 &&
223 exportID != exportProcIDs[i-1]) {
224 needSendBuff = 1;
225 }
226 ++numActive;
227 }
228 }
229
230#if defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
231 {
232 int global_needSendBuff;
233 reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
234 outArg (global_needSendBuff));
236 global_needSendBuff != 0,
237 "::createFromSends: Grouping export IDs together by process rank often "
238 "improves performance.");
239 }
240#endif
241
242 // Determine from the caller's data whether or not the current
243 // process should send (a) message(s) to itself.
244 if (starts[myProcID] != 0) {
245 sendMessageToSelf_ = true;
246 }
247 else {
248 sendMessageToSelf_ = false;
249 }
250
251 if (! needSendBuff) {
252 // grouped by proc, no send buffer or indicesTo_ needed
253 numSendsToOtherProcs_ = 0;
254 // Count total number of sends, i.e., total number of procs to
255 // which we are sending. This includes myself, if applicable.
256 for (int i = 0; i < numProcs; ++i) {
257 if (starts[i]) {
258 ++numSendsToOtherProcs_;
259 }
260 }
261
262 // Not only do we not need these, but we must clear them, as
263 // empty status of indicesTo is a flag used later.
264 indicesTo_.resize(0);
265 // Size these to numSendsToOtherProcs_; note, at the moment, numSendsToOtherProcs_
266 // includes self sends. Set their values to zeros.
267 procIdsToSendTo_.assign(numSendsToOtherProcs_,0);
268 startsTo_.assign(numSendsToOtherProcs_,0);
269 lengthsTo_.assign(numSendsToOtherProcs_,0);
270
271 // set startsTo to the offset for each send (i.e., each proc ID)
272 // set procsTo to the proc ID for each send
273 // in interpreting this code, remember that we are assuming contiguity
274 // that is why index skips through the ranks
275 {
276 size_t procIndex = 0;
277 for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
278 while (exportProcIDs[procIndex] < 0) {
279 ++procIndex; // skip all negative proc IDs
280 }
281 startsTo_[i] = procIndex;
282 int procID = exportProcIDs[procIndex];
283 procIdsToSendTo_[i] = procID;
284 procIndex += starts[procID];
285 }
286 }
287 // sort the startsTo and proc IDs together, in ascending order, according
288 // to proc IDs
289 if (numSendsToOtherProcs_ > 0) {
290 sort2(procIdsToSendTo_.begin(), procIdsToSendTo_.end(), startsTo_.begin());
291 }
292 // compute the maximum send length
293 maxSendLength_ = 0;
294 for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
295 int procID = procIdsToSendTo_[i];
296 lengthsTo_[i] = starts[procID];
297 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
298 maxSendLength_ = lengthsTo_[i];
299 }
300 }
301 }
302 else {
303 // not grouped by proc, need send buffer and indicesTo_
304
305 // starts[i] is the number of sends to proc i
306 // numActive equals number of sends total, \sum_i starts[i]
307
308 // this loop starts at starts[1], so explicitly check starts[0]
309 if (starts[0] == 0 ) {
310 numSendsToOtherProcs_ = 0;
311 }
312 else {
313 numSendsToOtherProcs_ = 1;
314 }
315 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
316 im1=starts.begin();
317 i != starts.end(); ++i)
318 {
319 if (*i != 0) ++numSendsToOtherProcs_;
320 *i += *im1;
321 im1 = i;
322 }
323 // starts[i] now contains the number of exports to procs 0 through i
324
325 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
326 i=starts.rbegin()+1;
327 i != starts.rend(); ++i)
328 {
329 *ip1 = *i;
330 ip1 = i;
331 }
332 starts[0] = 0;
333 // starts[i] now contains the number of exports to procs 0 through
334 // i-1, i.e., all procs before proc i
335
336 indicesTo_.resize(numActive);
337
338 for (size_t i = 0; i < numExports; ++i) {
339 if (exportProcIDs[i] >= 0) {
340 // record the offset to the sendBuffer for this export
341 indicesTo_[starts[exportProcIDs[i]]] = i;
342 // now increment the offset for this proc
343 ++starts[exportProcIDs[i]];
344 }
345 }
346 // our send buffer will contain the export data for each of the procs
347 // we communicate with, in order by proc id
348 // sendBuffer = {proc_0_data, proc_1_data, ..., proc_np-1_data}
349 // indicesTo now maps each export to the location in our send buffer
350 // associated with the export
351 // data for export i located at sendBuffer[indicesTo[i]]
352 //
353 // starts[i] once again contains the number of exports to
354 // procs 0 through i
355 for (int proc = numProcs-1; proc != 0; --proc) {
356 starts[proc] = starts[proc-1];
357 }
358 starts.front() = 0;
359 starts[numProcs] = numActive;
360 //
361 // starts[proc] once again contains the number of exports to
362 // procs 0 through proc-1
363 // i.e., the start of my data in the sendBuffer
364
365 // this contains invalid data at procs we don't care about, that is okay
366 procIdsToSendTo_.resize(numSendsToOtherProcs_);
367 startsTo_.resize(numSendsToOtherProcs_);
368 lengthsTo_.resize(numSendsToOtherProcs_);
369
370 // for each group of sends/exports, record the destination proc,
371 // the length, and the offset for this send into the
372 // send buffer (startsTo_)
373 maxSendLength_ = 0;
374 size_t snd = 0;
375 for (int proc = 0; proc < numProcs; ++proc ) {
376 if (starts[proc+1] != starts[proc]) {
377 lengthsTo_[snd] = starts[proc+1] - starts[proc];
378 startsTo_[snd] = starts[proc];
379 // record max length for all off-proc sends
380 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
381 maxSendLength_ = lengthsTo_[snd];
382 }
383 procIdsToSendTo_[snd] = proc;
384 ++snd;
385 }
386 }
387 }
388
389 if (sendMessageToSelf_) {
390 --numSendsToOtherProcs_;
391 }
392
393 // Invert map to see what msgs are received and what length
394 computeReceives();
395
396 // createFromRecvs() calls createFromSends(), but will set
397 // howInitialized_ again after calling createFromSends().
398 howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
399
400 return totalReceiveLength_;
401}
402
403void DistributorPlan::createFromRecvs(const Teuchos::ArrayView<const int>& remoteProcIDs)
404{
405 createFromSends(remoteProcIDs);
406
407 *this = *getReversePlan();
408
409 howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS;
410}
411
412void DistributorPlan::createFromSendsAndRecvs(const Teuchos::ArrayView<const int>& exportProcIDs,
413 const Teuchos::ArrayView<const int>& remoteProcIDs)
414{
415 // note the exportProcIDs and remoteProcIDs _must_ be a list that has
416 // an entry for each GID. If the export/remoteProcIDs is taken from
417 // the getProcs{From|To} lists that are extracted from a previous distributor,
418 // it will generate a wrong answer, because those lists have a unique entry
419 // for each processor id. A version of this with lengthsTo and lengthsFrom
420 // should be made.
421
422 howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
423
424
425 int myProcID = comm_->getRank ();
426 int numProcs = comm_->getSize();
427
428 const size_t numExportIDs = exportProcIDs.size();
429 Teuchos::Array<size_t> starts (numProcs + 1, 0);
430
431 size_t numActive = 0;
432 int needSendBuff = 0; // Boolean
433
434 for(size_t i = 0; i < numExportIDs; i++ )
435 {
436 if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
437 needSendBuff = 1;
438 if( exportProcIDs[i] >= 0 )
439 {
440 ++starts[ exportProcIDs[i] ];
441 ++numActive;
442 }
443 }
444
445 sendMessageToSelf_ = ( starts[myProcID] != 0 ) ? 1 : 0;
446
447 numSendsToOtherProcs_ = 0;
448
449 if( needSendBuff ) //grouped by processor, no send buffer or indicesTo_ needed
450 {
451 if (starts[0] == 0 ) {
452 numSendsToOtherProcs_ = 0;
453 }
454 else {
455 numSendsToOtherProcs_ = 1;
456 }
457 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
458 im1=starts.begin();
459 i != starts.end(); ++i)
460 {
461 if (*i != 0) ++numSendsToOtherProcs_;
462 *i += *im1;
463 im1 = i;
464 }
465 // starts[i] now contains the number of exports to procs 0 through i
466
467 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
468 i=starts.rbegin()+1;
469 i != starts.rend(); ++i)
470 {
471 *ip1 = *i;
472 ip1 = i;
473 }
474 starts[0] = 0;
475 // starts[i] now contains the number of exports to procs 0 through
476 // i-1, i.e., all procs before proc i
477
478 indicesTo_.resize(numActive);
479
480 for (size_t i = 0; i < numExportIDs; ++i) {
481 if (exportProcIDs[i] >= 0) {
482 // record the offset to the sendBuffer for this export
483 indicesTo_[starts[exportProcIDs[i]]] = i;
484 // now increment the offset for this proc
485 ++starts[exportProcIDs[i]];
486 }
487 }
488 for (int proc = numProcs-1; proc != 0; --proc) {
489 starts[proc] = starts[proc-1];
490 }
491 starts.front() = 0;
492 starts[numProcs] = numActive;
493 procIdsToSendTo_.resize(numSendsToOtherProcs_);
494 startsTo_.resize(numSendsToOtherProcs_);
495 lengthsTo_.resize(numSendsToOtherProcs_);
496 maxSendLength_ = 0;
497 size_t snd = 0;
498 for (int proc = 0; proc < numProcs; ++proc ) {
499 if (starts[proc+1] != starts[proc]) {
500 lengthsTo_[snd] = starts[proc+1] - starts[proc];
501 startsTo_[snd] = starts[proc];
502 // record max length for all off-proc sends
503 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
504 maxSendLength_ = lengthsTo_[snd];
505 }
506 procIdsToSendTo_[snd] = proc;
507 ++snd;
508 }
509 }
510 }
511 else {
512 // grouped by proc, no send buffer or indicesTo_ needed
513 numSendsToOtherProcs_ = 0;
514 // Count total number of sends, i.e., total number of procs to
515 // which we are sending. This includes myself, if applicable.
516 for (int i = 0; i < numProcs; ++i) {
517 if (starts[i]) {
518 ++numSendsToOtherProcs_;
519 }
520 }
521
522 // Not only do we not need these, but we must clear them, as
523 // empty status of indicesTo is a flag used later.
524 indicesTo_.resize(0);
525 // Size these to numSendsToOtherProcs_; note, at the moment, numSendsToOtherProcs_
526 // includes self sends. Set their values to zeros.
527 procIdsToSendTo_.assign(numSendsToOtherProcs_,0);
528 startsTo_.assign(numSendsToOtherProcs_,0);
529 lengthsTo_.assign(numSendsToOtherProcs_,0);
530
531 // set startsTo to the offset for each send (i.e., each proc ID)
532 // set procsTo to the proc ID for each send
533 // in interpreting this code, remember that we are assuming contiguity
534 // that is why index skips through the ranks
535 {
536 size_t procIndex = 0;
537 for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
538 while (exportProcIDs[procIndex] < 0) {
539 ++procIndex; // skip all negative proc IDs
540 }
541 startsTo_[i] = procIndex;
542 int procID = exportProcIDs[procIndex];
543 procIdsToSendTo_[i] = procID;
544 procIndex += starts[procID];
545 }
546 }
547 // sort the startsTo and proc IDs together, in ascending order, according
548 // to proc IDs
549 if (numSendsToOtherProcs_ > 0) {
550 sort2(procIdsToSendTo_.begin(), procIdsToSendTo_.end(), startsTo_.begin());
551 }
552 // compute the maximum send length
553 maxSendLength_ = 0;
554 for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
555 int procID = procIdsToSendTo_[i];
556 lengthsTo_[i] = starts[procID];
557 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
558 maxSendLength_ = lengthsTo_[i];
559 }
560 }
561 }
562
563
564 numSendsToOtherProcs_ -= sendMessageToSelf_;
565 std::vector<int> recv_list;
566 recv_list.reserve(numSendsToOtherProcs_); //reserve an initial guess for size needed
567
568 int last_pid=-2;
569 for(int i=0; i<remoteProcIDs.size(); i++) {
570 if(remoteProcIDs[i]>last_pid) {
571 recv_list.push_back(remoteProcIDs[i]);
572 last_pid = remoteProcIDs[i];
573 }
574 else if (remoteProcIDs[i]<last_pid)
575 throw std::runtime_error("Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
576 }
577 numReceives_ = recv_list.size();
578 if(numReceives_) {
579 procsFrom_.assign(numReceives_,0);
580 lengthsFrom_.assign(numReceives_,0);
581 indicesFrom_.assign(numReceives_,0);
582 startsFrom_.assign(numReceives_,0);
583 }
584 for(size_t i=0,j=0; i<numReceives_; ++i) {
585 int jlast=j;
586 procsFrom_[i] = recv_list[i];
587 startsFrom_[i] = j;
588 for( ; j<(size_t)remoteProcIDs.size() &&
589 remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
590 lengthsFrom_[i] = j-jlast;
591 }
592 totalReceiveLength_ = remoteProcIDs.size();
593 indicesFrom_.clear ();
594 numReceives_-=sendMessageToSelf_;
595}
596
597Teuchos::RCP<DistributorPlan> DistributorPlan::getReversePlan() const {
598 if (reversePlan_.is_null()) createReversePlan();
599 return reversePlan_;
600}
601
602void DistributorPlan::createReversePlan() const
603{
604 reversePlan_ = Teuchos::rcp(new DistributorPlan(comm_));
605 reversePlan_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
606 reversePlan_->sendType_ = sendType_;
607
608 // The total length of all the sends of this DistributorPlan. We
609 // calculate it because it's the total length of all the receives
610 // of the reverse DistributorPlan.
611 size_t totalSendLength =
612 std::accumulate(lengthsTo_.begin(), lengthsTo_.end(), 0);
613
614 // The maximum length of any of the receives of this DistributorPlan.
615 // We calculate it because it's the maximum length of any of the
616 // sends of the reverse DistributorPlan.
617 size_t maxReceiveLength = 0;
618 const int myProcID = comm_->getRank();
619 for (size_t i=0; i < numReceives_; ++i) {
620 if (procsFrom_[i] != myProcID) {
621 // Don't count receives for messages sent by myself to myself.
622 if (lengthsFrom_[i] > maxReceiveLength) {
623 maxReceiveLength = lengthsFrom_[i];
624 }
625 }
626 }
627
628 reversePlan_->sendMessageToSelf_ = sendMessageToSelf_;
629 reversePlan_->numSendsToOtherProcs_ = numReceives_;
630 reversePlan_->procIdsToSendTo_ = procsFrom_;
631 reversePlan_->startsTo_ = startsFrom_;
632 reversePlan_->lengthsTo_ = lengthsFrom_;
633 reversePlan_->maxSendLength_ = maxReceiveLength;
634 reversePlan_->indicesTo_ = indicesFrom_;
635 reversePlan_->numReceives_ = numSendsToOtherProcs_;
636 reversePlan_->totalReceiveLength_ = totalSendLength;
637 reversePlan_->lengthsFrom_ = lengthsTo_;
638 reversePlan_->procsFrom_ = procIdsToSendTo_;
639 reversePlan_->startsFrom_ = startsTo_;
640 reversePlan_->indicesFrom_ = indicesTo_;
641}
642
643void DistributorPlan::computeReceives()
644{
645 using Teuchos::Array;
646 using Teuchos::ArrayRCP;
647 using Teuchos::as;
648 using Teuchos::CommStatus;
649 using Teuchos::CommRequest;
650 using Teuchos::ireceive;
651 using Teuchos::RCP;
652 using Teuchos::rcp;
653 using Teuchos::REDUCE_SUM;
654 using Teuchos::receive;
655 using Teuchos::reduce;
656 using Teuchos::scatter;
657 using Teuchos::send;
658 using Teuchos::waitAll;
659
660 const int myRank = comm_->getRank();
661 const int numProcs = comm_->getSize();
662
663 const int mpiTag = DEFAULT_MPI_TAG;
664
665 // toProcsFromMe[i] == the number of messages sent by this process
666 // to process i. The data in numSendsToOtherProcs_, procIdsToSendTo_, and lengthsTo_
667 // concern the contiguous sends. Therefore, each process will be
668 // listed in procIdsToSendTo_ at most once, and so toProcsFromMe[i] will
669 // either be 0 or 1.
670 {
671 Array<int> toProcsFromMe (numProcs, 0);
672#ifdef HAVE_TPETRA_DEBUG
673 bool counting_error = false;
674#endif // HAVE_TPETRA_DEBUG
675 for (size_t i = 0; i < (numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0)); ++i) {
676#ifdef HAVE_TPETRA_DEBUG
677 if (toProcsFromMe[procIdsToSendTo_[i]] != 0) {
678 counting_error = true;
679 }
680#endif // HAVE_TPETRA_DEBUG
681 toProcsFromMe[procIdsToSendTo_[i]] = 1;
682 }
683#ifdef HAVE_TPETRA_DEBUG
684 // Note that SHARED_TEST_FOR_EXCEPTION does a global reduction
685 SHARED_TEST_FOR_EXCEPTION(counting_error, std::logic_error,
686 "Tpetra::Distributor::computeReceives: There was an error on at least "
687 "one process in counting the number of messages send by that process to "
688 "the other processs. Please report this bug to the Tpetra developers.",
689 *comm_);
690#endif // HAVE_TPETRA_DEBUG
691
692 // Compute the number of receives that this process needs to
693 // post. The number of receives includes any self sends (i.e.,
694 // messages sent by this process to itself).
695 //
696 // (We will use numReceives_ this below to post exactly that
697 // number of receives, with MPI_ANY_SOURCE as the sending rank.
698 // This will tell us from which processes this process expects
699 // to receive, and how many packets of data we expect to receive
700 // from each process.)
701 //
702 // toProcsFromMe[i] is the number of messages sent by this
703 // process to process i. Compute the sum (elementwise) of all
704 // the toProcsFromMe arrays on all processes in the
705 // communicator. If the array x is that sum, then if this
706 // process has rank j, x[j] is the number of messages sent
707 // to process j, that is, the number of receives on process j
708 // (including any messages sent by process j to itself).
709 //
710 // Yes, this requires storing and operating on an array of
711 // length P, where P is the number of processes in the
712 // communicator. Epetra does this too. Avoiding this O(P)
713 // memory bottleneck would require some research.
714 //
715 // mfh 09 Jan 2012, 15 Jul 2015: There are three ways to
716 // implement this O(P) memory algorithm.
717 //
718 // 1. Use MPI_Reduce and MPI_Scatter: reduce on the root
719 // process (0) from toProcsFromMe, to numRecvsOnEachProc.
720 // Then, scatter the latter, so that each process p gets
721 // numRecvsOnEachProc[p].
722 //
723 // 2. Like #1, but use MPI_Reduce_scatter instead of
724 // MPI_Reduce and MPI_Scatter. MPI_Reduce_scatter might be
725 // optimized to reduce the number of messages, but
726 // MPI_Reduce_scatter is more general than we need (it
727 // allows the equivalent of MPI_Scatterv). See Bug 6336.
728 //
729 // 3. Do an all-reduce on toProcsFromMe, and let my process
730 // (with rank myRank) get numReceives_ from
731 // toProcsFromMe[myRank]. The HPCCG miniapp uses the
732 // all-reduce method.
733 //
734 // Approaches 1 and 3 have the same critical path length.
735 // However, #3 moves more data. This is because the final
736 // result is just one integer, but #3 moves a whole array of
737 // results to all the processes. This is why we use Approach 1
738 // here.
739 //
740 // mfh 12 Apr 2013: See discussion in createFromSends() about
741 // how we could use this communication to propagate an error
742 // flag for "free" in a release build.
743
744 const int root = 0; // rank of root process of the reduction
745 Array<int> numRecvsOnEachProc; // temp; only needed on root
746 if (myRank == root) {
747 numRecvsOnEachProc.resize (numProcs);
748 }
749 int numReceivesAsInt = 0; // output
750 reduce<int, int> (toProcsFromMe.getRawPtr (),
751 numRecvsOnEachProc.getRawPtr (),
752 numProcs, REDUCE_SUM, root, *comm_);
753 scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
754 &numReceivesAsInt, 1, root, *comm_);
755 numReceives_ = static_cast<size_t> (numReceivesAsInt);
756 }
757
758 // Now we know numReceives_, which is this process' number of
759 // receives. Allocate the lengthsFrom_ and procsFrom_ arrays
760 // with this number of entries.
761 lengthsFrom_.assign (numReceives_, 0);
762 procsFrom_.assign (numReceives_, 0);
763
764 //
765 // Ask (via nonblocking receive) each process from which we are
766 // receiving how many packets we should expect from it in the
767 // communication pattern.
768 //
769
770 // At this point, numReceives_ includes any self message that
771 // there may be. At the end of this routine, we'll subtract off
772 // the self message (if there is one) from numReceives_. In this
773 // routine, we don't need to receive a message from ourselves in
774 // order to figure out our lengthsFrom_ and source process ID; we
775 // can just ask ourselves directly. Thus, the actual number of
776 // nonblocking receives we post here does not include the self
777 // message.
778 const size_t actualNumReceives = numReceives_ - (sendMessageToSelf_ ? 1 : 0);
779
780 // Teuchos' wrapper for nonblocking receives requires receive
781 // buffers that it knows won't go away. This is why we use RCPs,
782 // one RCP per nonblocking receive request. They get allocated in
783 // the loop below.
784 Array<RCP<CommRequest<int> > > requests (actualNumReceives);
785 Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
786 Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
787
788 // Teuchos::Comm treats a negative process ID as MPI_ANY_SOURCE
789 // (receive data from any process).
790#ifdef HAVE_MPI
791 const int anySourceProc = MPI_ANY_SOURCE;
792#else
793 const int anySourceProc = -1;
794#endif
795
796 // Post the (nonblocking) receives.
797 for (size_t i = 0; i < actualNumReceives; ++i) {
798 // Once the receive completes, we can ask the corresponding
799 // CommStatus object (output by wait()) for the sending process'
800 // ID (which we'll assign to procsFrom_[i] -- don't forget to
801 // do that!).
802 lengthsFromBuffers[i].resize (1);
803 lengthsFromBuffers[i][0] = as<size_t> (0);
804 requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc,
805 mpiTag, *comm_);
806 }
807
808 // Post the sends: Tell each process to which we are sending how
809 // many packets it should expect from us in the communication
810 // pattern. We could use nonblocking sends here, as long as we do
811 // a waitAll() on all the sends and receives at once.
812 //
813 // We assume that numSendsToOtherProcs_ and sendMessageToSelf_ have already been
814 // set. The value of numSendsToOtherProcs_ (my process' number of sends) does
815 // not include any message that it might send to itself.
816 for (size_t i = 0; i < numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0); ++i) {
817 if (procIdsToSendTo_[i] != myRank) {
818 // Send a message to procIdsToSendTo_[i], telling that process that
819 // this communication pattern will send that process
820 // lengthsTo_[i] blocks of packets.
821 const size_t* const lengthsTo_i = &lengthsTo_[i];
822 send<int, size_t> (lengthsTo_i, 1, as<int> (procIdsToSendTo_[i]), mpiTag, *comm_);
823 }
824 else {
825 // We don't need a send in the self-message case. If this
826 // process will send a message to itself in the communication
827 // pattern, then the last element of lengthsFrom_ and
828 // procsFrom_ corresponds to the self-message. Of course
829 // this process knows how long the message is, and the process
830 // ID is its own process ID.
831 lengthsFrom_[numReceives_-1] = lengthsTo_[i];
832 procsFrom_[numReceives_-1] = myRank;
833 }
834 }
835
836 //
837 // Wait on all the receives. When they arrive, check the status
838 // output of wait() for the receiving process ID, unpack the
839 // request buffers into lengthsFrom_, and set procsFrom_ from the
840 // status.
841 //
842 waitAll (*comm_, requests (), statuses ());
843 for (size_t i = 0; i < actualNumReceives; ++i) {
844 lengthsFrom_[i] = *lengthsFromBuffers[i];
845 procsFrom_[i] = statuses[i]->getSourceRank ();
846 }
847
848 // Sort the procsFrom_ array, and apply the same permutation to
849 // lengthsFrom_. This ensures that procsFrom_[i] and
850 // lengthsFrom_[i] refers to the same thing.
851 sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
852
853 // Compute indicesFrom_
854 totalReceiveLength_ =
855 std::accumulate (lengthsFrom_.begin (), lengthsFrom_.end (), 0);
856 indicesFrom_.clear ();
857
858 startsFrom_.clear ();
859 startsFrom_.reserve (numReceives_);
860 for (size_t i = 0, j = 0; i < numReceives_; ++i) {
861 startsFrom_.push_back(j);
862 j += lengthsFrom_[i];
863 }
864
865 if (sendMessageToSelf_) {
866 --numReceives_;
867 }
868}
869
870void DistributorPlan::setParameterList(const Teuchos::RCP<Teuchos::ParameterList>& plist)
871{
872 using Teuchos::FancyOStream;
873 using Teuchos::getIntegralValue;
874 using Teuchos::ParameterList;
875 using Teuchos::parameterList;
876 using Teuchos::RCP;
877 using std::endl;
878
879 if (! plist.is_null()) {
880 RCP<const ParameterList> validParams = getValidParameters ();
881 plist->validateParametersAndSetDefaults (*validParams);
882
883 const Details::EDistributorSendType sendType =
884 getIntegralValue<Details::EDistributorSendType> (*plist, "Send type");
885
886 // Now that we've validated the input list, save the results.
887 sendType_ = sendType;
888
889 // ParameterListAcceptor semantics require pointer identity of the
890 // sublist passed to setParameterList(), so we save the pointer.
891 this->setMyParamList (plist);
892 }
893}
894
895Teuchos::Array<std::string> distributorSendTypes()
896{
897 Teuchos::Array<std::string> sendTypes;
898 sendTypes.push_back ("Isend");
899 sendTypes.push_back ("Send");
900 sendTypes.push_back ("Alltoall");
901 return sendTypes;
902}
903
904Teuchos::RCP<const Teuchos::ParameterList>
905DistributorPlan::getValidParameters() const
906{
907 using Teuchos::Array;
908 using Teuchos::ParameterList;
909 using Teuchos::parameterList;
910 using Teuchos::RCP;
911 using Teuchos::setStringToIntegralParameter;
912
913 Array<std::string> sendTypes = distributorSendTypes ();
914 const std::string defaultSendType ("Send");
915 Array<Details::EDistributorSendType> sendTypeEnums;
916 sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
917 sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
918 sendTypeEnums.push_back (Details::DISTRIBUTOR_ALLTOALL);
919
920 RCP<ParameterList> plist = parameterList ("Tpetra::Distributor");
921
922 setStringToIntegralParameter<Details::EDistributorSendType> ("Send type",
923 defaultSendType, "When using MPI, the variant of send to use in "
924 "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
925 plist->set ("Timer Label","","Label for Time Monitor output");
926
927 return Teuchos::rcp_const_cast<const ParameterList> (plist);
928}
929
930}
931}
Declaration of Tpetra::Details::Behavior, a class that describes Tpetra's behavior.
Stand-alone utility functions and macros.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, msg)
Print or throw an efficency warning.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
Struct that holds views of the contents of a CrsMatrix.
static bool debug()
Whether Tpetra is in debug mode.
Implementation details of Tpetra.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
EDistributorSendType
The type of MPI send that Distributor should use.
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
Namespace Tpetra contains the class and methods constituting the Tpetra library.
Teuchos::Array< std::string > distributorSendTypes()
Valid values for Distributor's "Send type" parameter.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2)
Sort the first array, and apply the resulting permutation to the second array.