Zoltan2
Loading...
Searching...
No Matches
Zoltan2_Directory_Comm.cpp
Go to the documentation of this file.
1/*
2 * @HEADER
3 *
4 * ***********************************************************************
5 *
6 * Zoltan2 Directory for Load-balancing, Partitioning, Ordering and Coloring
7 * Copyright 2012 Sandia Corporation
8 *
9 * Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
10 * the U.S. Government retains certain rights in this software.
11 *
12 * Redistribution and use in source and binary forms, with or without
13 * modification, are permitted provided that the following conditions are
14 * met:
15 *
16 * 1. Redistributions of source code must retain the above copyright
17 * notice, this list of conditions and the following disclaimer.
18 *
19 * 2. Redistributions in binary form must reproduce the above copyright
20 * notice, this list of conditions and the following disclaimer in the
21 * documentation and/or other materials provided with the distribution.
22 *
23 * 3. Neither the name of the Corporation nor the names of theremove_local
24 * contributors may be used to endorse or promote products derived from
25 * this software without specific prior written permission.
26 *
27 * THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
28 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
29 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
30 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
31 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
32 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
33 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
34 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
35 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
36 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
37 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 *
39 * Questions? Contact Karen Devine kddevin@sandia.gov
40 * Erik Boman egboman@sandia.gov
41 *
42 * ***********************************************************************
43 *
44 * @HEADER
45 */
46
48#include <stdexcept>
49#include <memory>
50
51namespace Zoltan2 {
52
55 for (int i = 0; i < from->nsends + from->self_msg; i++) {
56 total_recv_size += from->lengths_to[i];
57 }
58
59 max_send_size = 0;
60 for (int i = 0; i < from->nrecvs; i++) {
61 if (from->lengths_from[i] > max_send_size) {
62 max_send_size = from->lengths_from[i];
63 }
64 }
65
66 nvals = from->nvals_recv;
67 nvals_recv = from->nvals;
68 lengths_to = from->lengths_from;
69 procs_to = from->procs_from;
70 indices_to = from->indices_from;
71
72 starts_to = from->starts_from;
73 lengths_from = from->lengths_to;
74 procs_from = from->procs_to;
75 indices_from = from->indices_to;
76
77 starts_from = from->starts_to;
78 nrecvs = from->nsends;
79 nsends = from->nrecvs;
80 self_msg = from->self_msg;
81 comm = from->comm;
82}
83
84
85void Zoltan2_Directory_Plan::print(const std::string& headerMessage) const {
86
87 #define PRINT_VECTOR(v) \
88 if(v != Teuchos::null) { \
89 std::cout << " " << #v << " "; \
90 for(Teuchos::ArrayRCP<int>::size_type n = 0; n < v.size(); ++n) { \
91 std::cout << v[n] << " "; \
92 } \
93 std::cout << std::endl; \
94 }
95
96 #define PRINT_VAL(val) std::cout << " " << #val << ": " << val << std::endl;
97
98 for(int proc = 0; proc < comm->getSize(); ++proc) {
99 comm->barrier();
100 if(proc == comm->getRank()) {
101
102 std::cout << "Rank " << proc << " " << headerMessage << std::endl;
103
119
128 }
129 }
130 comm->barrier();
131}
132
134 int nvals, /* number of values I currently own */
135 const Teuchos::ArrayRCP<int> &assign, /* processor assignment for all my values */
136 Teuchos::RCP<const Teuchos::Comm<int> > comm, /* communicator */
137 int tag) : /* message tag I can use */
138 comm_(comm),
139 plan_forward(NULL)
140{
141 if (comm == Teuchos::null){
142 throw std::logic_error("Invalid communicator: MPI_COMM_NULL.");
143 }
144
145 int my_proc = comm->getRank(); /* my processor tag in communicator */
146 int nprocs = comm->getSize(); /* number of processors in communicator */
147
148 /* First check to see if items are grouped by processor with no gaps. */
149 /* If so, indices_to should be NULL (= identity) */
150
151 /* Make data structures that will allow me to traverse arrays quickly. */
152 /* Begin by determining number of objects I'm sending to each processor. */
153 Teuchos::ArrayRCP<int> starts(new int[nprocs + 1], 0, nprocs + 1, true);
154 for(int n = 0; n < starts.size(); ++n) {
155 starts[n] = 0;
156 }
157
158 /* Note: Negative assign value means ignore item. */
159 /* Non-trailing negatives mean data not packed so need send_buf. */
160 /* Could (but don't) allow negatives between processor blocks w/o buf. */
161 int nactive = 0; /* number of values to remap */
162
163 int no_send_buff = 1; /* is data nicely grouped by processor? */
164
165 int prev_proc = nprocs; /* processor on previous loop pass */
166
167 for (int i = 0; i < nvals; i++) {
168 int proc = assign[i];
169 if (no_send_buff && proc != prev_proc) { /* Checks if blocked by proc */
170 if (proc >= 0 && (starts[proc] || prev_proc < 0)) {
171 no_send_buff = 0;
172 }
173 else {
174 prev_proc = proc;
175 }
176 }
177 if (proc >= 0) {
178 ++starts[proc];
179 ++nactive;
180 }
181 }
182
183 int self_msg = (starts[my_proc] != 0); /* do I have data for myself? */
184
185 Teuchos::ArrayRCP<int> lengths_to; /* lengths I'll send to */
186 Teuchos::ArrayRCP<int> procs_to; /* processors I'll send to */
187 Teuchos::ArrayRCP<int> starts_to; /* where in list my sends begin */
188 Teuchos::ArrayRCP<int> indices_to; /* local_id values I'll be sending */
189
190 int max_send_size = 0; /* size of longest message I send */
191 int nsends = 0; /* # procs I'll send to (including self) */
192 int nrecvs = 0; /* # procs I'll recv from (including self) */
193
194 if (no_send_buff) {
195 /* Grouped by processor. Array indices_to can be NULL (= identity) */
196 nsends = 0;
197 for (int i = 0; i < nprocs; i++) {
198 if (starts[i] != 0) ++nsends;
199 }
200
201 lengths_to.resize(nsends);
202 starts_to.resize(nsends);
203 procs_to.resize(nsends);
204
205 int index = 0; /* index into list of objects */
206 /* Note that procs_to is in the order the data was passed in. */
207 for (int i = 0; i < nsends; i++) {
208 starts_to[i] = index;
209 int proc = assign[index];
210 procs_to[i] = proc;
211 index += starts[proc];
212 }
213
214 /* Now sort the outgoing procs. */
215 /* This keeps recvs deterministic if I ever invert communication */
216 /* It also allows for better balance of traffic in comm_do */
217 sort_ints(procs_to, starts_to);
218
219 max_send_size = 0;
220 for (int i = 0; i < nsends; i++) {
221 int proc = procs_to[i];
222 lengths_to[i] = starts[proc];
223 if (proc != my_proc && lengths_to[i] > max_send_size) {
224 max_send_size = lengths_to[i];
225 }
226 }
227 }
228 else { /* Not grouped by processor. More complex data structures. */
229 /* Sum starts values to be offsets into indices_to array. */
230 nsends = (starts[0] != 0);
231 for (int i = 1; i < nprocs; i++) {
232 if (starts[i] != 0)
233 ++nsends;
234 starts[i] += starts[i - 1];
235 }
236
237 for (int i = nprocs - 1; i; i--) {
238 starts[i] = starts[i - 1];
239 }
240
241 starts[0] = 0;
242
243 indices_to = (nactive > 0) ?
244 Teuchos::arcp(new int[nactive], 0, nactive, true) : Teuchos::null;
245
246 for (int i = 0; i < nvals; i++) {
247 int proc = assign[i];
248 if (proc >= 0) {
249 indices_to[starts[proc]] = i;
250 ++starts[proc];
251 }
252 }
253
254 /* Indices_to array now has the data in clumps for each processor. */
255 /* Now reconstruct starts array to index into indices_to. */
256 for (int i = nprocs - 1; i; i--) {
257 starts[i] = starts[i - 1];
258 }
259 starts[0] = 0;
260 starts[nprocs] = nactive;
261
262 /* Construct lengths_to, starts_to and procs_to arrays. */
263 /* Note: If indices_to is needed, procs are in increasing order */
264 lengths_to.resize(nsends);
265 starts_to.resize(nsends);
266 procs_to.resize(nsends);
267
268 int j = 0;
269 max_send_size = 0;
270 for (int i = 0; i < nprocs; i++) {
271 if (starts[i + 1] != starts[i]) {
272 starts_to[j] = starts[i];
273 lengths_to[j] = starts[i + 1] - starts[i];
274 if (i != my_proc && lengths_to[j] > max_send_size) {
275 max_send_size = lengths_to[j];
276 }
277 procs_to[j] = i;
278 j++;
279 }
280 }
281 }
282
283 /* Now change nsends to count only non-self messages */
284 nsends -= self_msg;
285
286 /* Determine how many messages & what length I'll receive. */
287 Teuchos::ArrayRCP<int> lengths_from; /* lengths of messages I'll receive */
288 Teuchos::ArrayRCP<int> procs_from; /* processors I'll receive from */
289 int out_of_mem = 0; // TODO refactor this bit
290
291 int comm_flag = invert_map(lengths_to, procs_to, nsends, self_msg,
292 lengths_from, procs_from, &nrecvs, my_proc, nprocs,
293 out_of_mem, tag, comm);
294
295 /* pointers for where to put recv data */
296 Teuchos::ArrayRCP<int> starts_from(
297 new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
298 int j = 0;
299 for (int i = 0; i < nrecvs + self_msg; i++) {
300 starts_from[i] = j;
301 j += lengths_from[i];
302 }
303
304 if (comm_flag != 0) {
305 throw std::logic_error("Failed to construct Zoltan2_Directory_Comm");
306 }
307
308 int total_recv_size = 0; /* total size of messages I recv */
309 for (int i = 0; i < nrecvs + self_msg; i++) {
310 total_recv_size += lengths_from[i];
311 }
312
313 plan_forward = new Zoltan2_Directory_Plan;
314 plan_forward->lengths_to = lengths_to;
315 plan_forward->starts_to = starts_to;
316 plan_forward->procs_to = procs_to;
317 plan_forward->indices_to = indices_to;
318 plan_forward->lengths_from = lengths_from;
319 plan_forward->starts_from = starts_from;
320 plan_forward->procs_from = procs_from;
321 plan_forward->nvals = nvals;
322 plan_forward->nvals_recv = total_recv_size;
323 plan_forward->nrecvs = nrecvs;
324 plan_forward->nsends = nsends;
325 plan_forward->self_msg = self_msg;
326 plan_forward->max_send_size = max_send_size;
327 plan_forward->total_recv_size = total_recv_size;
328 plan_forward->maxed_recvs = 0;
329 plan_forward->comm = comm;
330
331 if (MPI_RECV_LIMIT > 0) {
332
333 throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (1)"); // needs unit testing
334
335 /* If we have a limit to the number of posted receives we are allowed,
336 ** and our plan has exceeded that, then switch to an MPI_Alltoallv so
337 ** that we will have fewer receives posted when we do the communication.
338 */
339 int global_nrecvs;
340 Teuchos::reduceAll(*comm, Teuchos::REDUCE_MAX, 1, &nrecvs, &global_nrecvs);
341 if (global_nrecvs > MPI_RECV_LIMIT){
342 plan_forward->maxed_recvs = 1;
343 }
344 }
345
346 if (plan_forward->maxed_recvs == 0) {
347 // See notes in header for MPI_Request
348 plan_forward->request.resize(plan_forward->nrecvs);
349 }
350
351 nrec = total_recv_size;
352}
353
355{
356 delete plan_forward;
357}
358
359int Zoltan2_Directory_Comm::invert_map(
360 const Teuchos::ArrayRCP<int> &lengths_to, /* number of items I'm sending */
361 const Teuchos::ArrayRCP<int> &procs_to, /* procs I send to */
362 int nsends, /* number of messages I'll send */
363 int self_msg, /* do I copy data to myself? */
364 Teuchos::ArrayRCP<int> &lengths_from, /* number of items I'm receiving */
365 Teuchos::ArrayRCP<int> &procs_from, /* procs I recv lengths from */
366 int *pnrecvs, /* number of messages I receive */
367 int my_proc, /* my processor number */
368 int nprocs, /* total number of processors */
369 int /* out_of_mem */, /* tell everyone I'm out of memory? */
370 int tag, /* message tag I can use */
371 Teuchos::RCP<const Teuchos::Comm<int> > comm) /* communicator */
372{
373 Teuchos::ArrayRCP<int> msg_count(new int[nprocs], 0, nprocs, true);
374 Teuchos::ArrayRCP<int> counts(new int[nprocs], 0, nprocs, true);
375 for(int i = 0; i < nprocs; ++i) {
376 msg_count[i] = 0;
377 counts[i] = 1;
378 }
379
380 for (int i = 0; i < nsends + self_msg; i++) {
381 msg_count[procs_to[i]] = 1;
382 }
383
384 /*
385 * KDDKDD: Replaced MPI_Reduce_scatter with MPI_Reduce and MPI_Scatter
386 * KDDKDD: to avoid reported problems with MPICH 1.5.2.1.
387 * KDDKDD: Some sort of MPI_TYPE_INDEXED error.
388 * KDDKDD: Bug fix suggested by Clark Dohrmann and Rob Hoekstra.
389 * KDDKDD: July 20, 2004
390
391 MPI_Reduce_scatter((void *) msg_count, (void *) &nrecvs, counts, MPI_INT,
392 MPI_SUM, comm);
393 */
394 Teuchos::reduceAll<int>(*comm, Teuchos::REDUCE_SUM, nprocs,
395 msg_count.getRawPtr(), counts.getRawPtr());
396
397 int nrecvs = 0; /* number of messages I'll receive */
398
399 Teuchos::scatter<int, int>(&(counts[0]), 1, &nrecvs, 1, 0, *comm);
400
401 int max_nrecvs = 0;
402 if (my_proc == 0) {
403 for (int i=0; i < nprocs; i++) {
404 if (counts[i] > max_nrecvs) {
405 max_nrecvs = counts[i];
406 }
407 }
408 }
409
410 Teuchos::broadcast(*comm, 0, &max_nrecvs);
411
412 if(nrecvs > 0) {
413 lengths_from.resize(nrecvs); /* number of items I'm receiving */
414 procs_from.resize(nrecvs); /* processors I'll receive from */
415 }
416
417 if (MPI_RECV_LIMIT == 0 || max_nrecvs <= MPI_RECV_LIMIT) {
418 // See notes in header for MPI_Request
419 Teuchos::ArrayRCP<Teuchos::RCP<Teuchos::CommRequest<int> > > requests(nrecvs);
420
421 /* Note: I'm counting on having a unique tag or some of my incoming */
422 /* messages might get confused with others. */
423 for (int i=0; i < nrecvs; i++) {
424#ifdef HAVE_MPI // Teuchos::ireceive not implemented for Serial - Serial is just for debugging
425 Teuchos::ArrayRCP<int> single_elem(&lengths_from[i], 0, 1, false);
426 requests[i] = Teuchos::ireceive(single_elem, MPI_ANY_SOURCE, tag, *comm);
427#endif
428 }
429
430 for (int i=0; i < nsends+self_msg; i++) {
431#ifdef HAVE_MPI // Teuchos::send not implemented for Serial - Serial is just for debugging
432 Teuchos::send(&lengths_to[i], 1, procs_to[i], tag, *comm);
433#endif
434 }
435
436 for (int i=0; i < nrecvs; i++) {
437#ifdef HAVE_MPI
438 procs_from[i] = requests[i]->wait()->getSourceRank();
439#else
440 // above Teuchos MPI calls not supported for Serial so manually do the transfer.
441 // We don't really need Serial for this class but it helps with debugging to have a serial test that can run.
442 lengths_from[i] = lengths_to[i];
443#endif
444 }
445
446 }
447 else { /* some large HPC machines have a limit on number of posted receives */
448 Teuchos::ArrayRCP<int> sendbuf(new int[nprocs], 0, nprocs, true);
449 Teuchos::ArrayRCP<int> recvbuf(new int[nprocs], 0, nprocs, true);
450
451 for (int i=0; i < nsends + self_msg; i++) {
452 sendbuf[procs_to[i]] = lengths_to[i];
453 }
454
455 throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (2)"); // needs unit testing
456 // Did not refactor this - need Teuchos form but this is not tested code.
457 // MPI_Alltoall(&(sendbuf[0]), 1, MPI_INT, &(recvbuf[0]), 1, MPI_INT, getRawComm());
458
459 for (int i=0, j=0; i < nprocs; i++) {
460 if (recvbuf[i] > 0){
461 lengths_from[j] = recvbuf[i];
462 procs_from[j] = i;
463 if (++j == nrecvs) {
464 break;
465 }
466 }
467 }
468 }
469
470 /* Sort recv lists to keep execution deterministic (e.g. for debugging) */
471 sort_ints(procs_from, lengths_from);
472
473 *pnrecvs = nrecvs - self_msg; /* Only return number of true messages */
474
475 return 0;
476}
477
478int Zoltan2_Directory_Comm::sort_ints(
479 Teuchos::ArrayRCP<int> &vals_sort, /* values to be sorted */
480 Teuchos::ArrayRCP<int> &vals_other) /* other array to be reordered w/ sort */
481{
482 // TODO: Check - perhaps we can skip all of these for efficiency
483 if (vals_sort == Teuchos::null || vals_sort.size() == 0) {
484 return 1;
485 }
486 if (vals_other == Teuchos::null || vals_other.size() == 0) {
487 return 1;
488 }
489 if (vals_sort == Teuchos::null || vals_sort.size() == 1) {
490 return 0; /* fastest way to sort 1 item is to return */
491 }
492
493 /* find largest value (sort sometimes used for non processor lists) */
494 int already_sorted = 1; /* flag indicating whether vals_sort is
495 already sorted; can exit early and skip
496 memory allocations if it is. */
497 int top = vals_sort[0]; /* largest integer to sort, smallest is assumed 0 */
498 for (Teuchos::ArrayRCP<int>::size_type i = 1; i < vals_sort.size(); i++) {
499 if (vals_sort[i-1] > vals_sort[i]) {
500 already_sorted = 0;
501 }
502 if (top < vals_sort[i]) {
503 top = vals_sort[i];
504 }
505 }
506
507 if (already_sorted) {
508 return 0;
509 }
510
511 Teuchos::ArrayRCP<int> store(new int[top+2], 0, top+2, true);
512 for(int n = 0; n < store.size(); ++n) {
513 store[n] = 0;
514 }
515
516 Teuchos::ArrayRCP<int> copy_sort(new int[vals_sort.size()], 0, vals_sort.size(), true);
517 for(Teuchos::ArrayRCP<int>::size_type n = 0; n < copy_sort.size(); ++n) {
518 copy_sort[n] = vals_sort[n]; // TODO - use deepCopy method?
519 }
520
521 Teuchos::ArrayRCP<int> copy_other(new int[vals_other.size()], 0, vals_other.size(), true);
522 for(Teuchos::ArrayRCP<int>::size_type n = 0; n < copy_other.size(); ++n) {
523 copy_other[n] = vals_other[n]; // TODO - use deepCopy method?
524 }
525
526 // TODO: May want to modernize this ptr handling - however I didn't want
527 // to introduce inefficiencies so for now have kept the original structure
528 int *p = &(store[1]);
529 for (Teuchos::ArrayRCP<int>::size_type i = 0; i < vals_sort.size(); i++) {
530 p[copy_sort[i]]++; /* count number of occurances */
531 }
532
533 for (int i = 1; i < top+1; i++) {
534 p[i] += p[i-1]; /* compute partial sums */
535 }
536 /* assert: p[top] = nvals */
537 p = &(store[0]); /* effectively shifts down by one */
538 for (Teuchos::ArrayRCP<int>::size_type i = 0; i < vals_sort.size(); i++) {
539 vals_sort[p[copy_sort[i]]] = copy_sort[i];
540 vals_other[p[copy_sort[i]]] = copy_other[i];
541 ++p[copy_sort[i]];
542 }
543
544 return 0;
545}
546
548 int tag, /* message tag for communicating */
549 const Teuchos::ArrayRCP<char> &send_data, /* array of data I currently own */
550 int nbytes, /* msg size */
551 Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after comm */
552{
553 int status = 0;
554
555 if (!plan_forward->maxed_recvs) {
556 status = do_post (plan_forward, tag, send_data, nbytes, recv_data);
557 if (status == 0) {
558 status = do_wait (plan_forward, tag, send_data, nbytes, recv_data);
559 }
560 }
561 else {
562 status = do_all_to_all(plan_forward, send_data, nbytes, recv_data);
563 }
564
565 return status;
566}
567
568int Zoltan2_Directory_Comm::do_post(
569 Zoltan2_Directory_Plan *plan, /* communication data structure */
570 int tag, /* message tag for communicating */
571 const Teuchos::ArrayRCP<char> &send_data, /* array of data I currently own */
572 int nbytes, /* msg size */
573 Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after comm */
574{
575 /* Check input parameters */
576 if (!plan) {
577 throw std::logic_error("Communication plan = NULL");
578 }
579
580 /* If not point to point, currently we do synchroneous communications */
581 if (plan->maxed_recvs) {
582 throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (3)"); // needs unit testing
583 return do_all_to_all(plan, send_data, nbytes, recv_data);
584 }
585
586 int my_proc = plan->comm->getRank(); /* processor ID */
587 if ((plan->nsends + plan->self_msg) && !send_data.size()) {
588 throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (4)"); // needs unit testing
589 size_t sum = 0;
590 if (plan->sizes_to.size()) { /* Not an error if all sizes_to == 0 */
591 for (int i = 0; i < (plan->nsends + plan->self_msg); i++) {
592 sum += plan->sizes_to[i];
593 }
594 }
595 if (!plan->sizes_to.size() || (plan->sizes_to.size() && sum)) {
596 throw std::logic_error("nsends not zero, but send_data = NULL");
597 }
598 }
599 if ((plan->nrecvs + plan->self_msg) && recv_data == Teuchos::null) {
600 throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (5)"); // needs unit testing
601 size_t sum = 0;
602 if (plan->sizes_from != Teuchos::null) /* Not an error if all sizes_from == 0 */
603 for (int i = 0; i < (plan->nrecvs + plan->self_msg); i++)
604 sum += plan->sizes_from[i];
605 if (!plan->sizes_from.size() || (plan->sizes_from.size() && sum)) {
606 throw std::logic_error("nrecvs not zero, but recv_data = NULL");
607 }
608 }
609
610 /* Post irecvs */
611 if (plan->indices_from == Teuchos::null) {
612 /* Data can go directly into user space. */
613 plan->recv_buff = recv_data;
614 }
615 else { /* Need to buffer receive to reorder */
616 size_t rsize = (size_t) (plan->total_recv_size) * (size_t) nbytes;
617 plan->recv_buff = Teuchos::arcp(new char[rsize], 0, rsize, true);
618 }
619
620 size_t self_recv_address = 0; /* where in recv_data self info starts */
621 if (!plan->using_sizes) { /* All data the same size */
622 int k = 0;
623 for (int i = 0; i < plan->nrecvs + plan->self_msg; i++) {
624 if (plan->procs_from[i] != my_proc) {
625 Teuchos::ArrayRCP<char> subview(
626 &plan->getRecvBuff().getRawPtr()[plan->starts_from[i] * nbytes],
627 0, plan->lengths_from[i] * nbytes, false);
628 plan->request[k] = Teuchos::ireceive(subview, plan->procs_from[i], tag, *comm_);
629 k++;
630 }
631 else {
632 self_recv_address = (size_t)(plan->starts_from[i]) * (size_t)nbytes;
633 }
634 }
635 }
636 else { /* Data of varying sizes */
637 int k = 0;
638 for (int i = 0; i < plan->nrecvs + plan->self_msg; i++) {
639 if (plan->procs_from[i] != my_proc) {
640 if (plan->sizes_from[i]) {
641 Teuchos::ArrayRCP<char> subview(
642 &plan->getRecvBuff().getRawPtr()[(size_t)(plan->starts_from_ptr[i])
643 * (size_t)nbytes],
644 0, plan->sizes_from[i] * nbytes, false);
645 plan->request[k] = Teuchos::ireceive(subview, plan->procs_from[i], tag, *comm_);
646 }
647 else {
648 plan->request[k] = Teuchos::null;
649 }
650 k++;
651 }
652 else {
653 self_recv_address =
654 (size_t)(plan->starts_from_ptr[i]) * (size_t)nbytes;
655 }
656 }
657 }
658
659 Teuchos::ArrayRCP<char> send_buff;
660 if(plan->indices_to != Teuchos::null) {
661 send_buff = Teuchos::arcp(new char[plan->max_send_size * nbytes], 0, plan->max_send_size * nbytes, true);
662 }
663
664 /* Barrier to ensure irecvs are posted before doing any sends. */
665 /* Simultaneously see if anyone out of memory */
666 int out_of_mem = 0;
667 // WARNING - do not delete this without proper barrier added as replacmeent.
668 // I'm refactoring memory handling so probably we won't use out_of_mem
669 // in the new version but we must still preserve a barrier here or get
670 // intermittent failures.
671 // I'll keep the memory reduce for now since we may end up with a memory
672 // handling anyways.
673 int global_out_of_mem;
674 Teuchos::reduceAll(*plan->comm, Teuchos::REDUCE_SUM, 1, &out_of_mem,
675 &global_out_of_mem);
676
677 /* Send out data */
678
679 /* Scan through procs_to list to start w/ higher numbered procs */
680 /* This should balance message traffic. */
681
682 int nblocks = plan->nsends + plan->self_msg; /* # procs needing my data */
683 int proc_index = 0; /* loop counter over procs to send to */
684 while (proc_index < nblocks && plan->procs_to[proc_index] < my_proc) {
685 proc_index++;
686 }
687 if (proc_index == nblocks) {
688 proc_index = 0;
689 }
690
691 if (!plan->using_sizes) { /* Data all of same size */
692 if (plan->indices_to == Teuchos::null) { /* data already blocked by processor. */
693 int self_num = 0; /* where in send list my_proc appears */
694 for (int i = proc_index, j = 0; j < nblocks; j++) {
695 if (plan->procs_to[i] != my_proc) {
696 Teuchos::ArrayRCP<char> subview(
697 &send_data[plan->starts_to[i] * nbytes],
698 0, plan->lengths_to[i] * nbytes, false);
699 Teuchos::readySend(subview.getRawPtr(), static_cast<int>(subview.size()), plan->procs_to[i], tag, *comm_);
700 }
701 else {
702 self_num = i;
703 }
704 if (++i == nblocks) {
705 i = 0;
706 }
707 }
708
709 if (plan->self_msg) { /* Copy data to self. */
710 /* I use array+offset instead of &(array[offset]) because of
711 a bug with PGI v9 */
712 /* I use memmove because I'm not sure that the pointer are not
713 overlapped. */
714
715 // TODO: Refactor to make C++ and remove getRawPtr, etc
716 memmove(
717 plan->getRecvBuff().getRawPtr()+self_recv_address,
718 send_data.getRawPtr()+(size_t)(plan->starts_to[self_num])*(size_t)nbytes,
719 (size_t) (plan->lengths_to[self_num]) * (size_t) nbytes);
720 }
721 }
722 else { /* Not blocked by processor. Need to buffer. */
723 int self_index = 0; /* send offset for data I'm keeping */
724 int self_num = 0; /* where in send list my_proc appears */
725 for (int i = proc_index, jj = 0; jj < nblocks; jj++) {
726 if (plan->procs_to[i] != my_proc) {
727 /* Need to pack message first. */
728 size_t offset = 0; /* offset into array I'm copying into */
729 int j = plan->starts_to[i];
730 for (int k = 0; k < plan->lengths_to[i]; k++) {
731 memcpy(&send_buff[offset],
732 &send_data[(size_t)(plan->indices_to[j++]) * (size_t)nbytes], nbytes);
733 offset += nbytes;
734 }
735 Teuchos::readySend(&send_buff[0], plan->lengths_to[i] * nbytes,
736 plan->procs_to[i], tag, *comm_);
737 }
738 else {
739 self_num = i;
740 self_index = plan->starts_to[i];
741 }
742 if (++i == nblocks)
743 i = 0;
744 }
745
746 if (plan->self_msg) { /* Copy data to self. */
747 for (int k = 0; k < plan->lengths_to[self_num]; k++) {
748 memcpy(&(plan->getRecvBuff())[self_recv_address],
749 &send_data[
750 (size_t)(plan->indices_to[self_index++]) * (size_t)nbytes],
751 nbytes);
752 self_recv_address += nbytes;
753 }
754 }
755 }
756 }
757 else { /* Data of differing sizes */
758 if (plan->indices_to == Teuchos::null) { /* data already blocked by processor. */
759 int self_num = 0; /* where in send list my_proc appears */
760 for (int i = proc_index, j = 0; j < nblocks; j++) {
761 if (plan->procs_to[i] != my_proc) {
762 if (plan->sizes_to[i]) {
763 Teuchos::readySend(&send_data[plan->starts_to_ptr[i] * nbytes], plan->sizes_to[i] * nbytes,
764 plan->procs_to[i], tag, *comm_);
765 }
766 }
767 else
768 self_num = i;
769
770 if (++i == nblocks)
771 i = 0;
772 }
773 if (plan->self_msg) { /* Copy data to self. */
774 if (plan->sizes_to[self_num]) {
775 char* lrecv = &plan->getRecvBuff().getRawPtr()[self_recv_address];
776 const char* lsend =
777 &send_data.getRawPtr()[(size_t)(plan->starts_to_ptr[self_num]) * (size_t)nbytes];
778 int sindex = plan->sizes_to[self_num], idx;
779 for (idx=0; idx<nbytes; idx++) {
780 memcpy(lrecv, lsend, sindex);
781 lrecv += sindex;
782 lsend += sindex;
783 }
784 }
785 }
786 }
787 else { /* Not blocked by processor. Need to buffer. */
788 int self_num = 0; /* where in send list my_proc appears */
789 for (int i = proc_index, jj = 0; jj < nblocks; jj++) {
790 if (plan->procs_to[i] != my_proc) {
791 size_t offset = 0; /* offset into array I'm copying into */
792 int j = plan->starts_to[i];
793 for (int k = 0; k < plan->lengths_to[i]; k++) {
794 if (plan->sizes[plan->indices_to[j]]) {
795 memcpy(&send_buff[offset],
796 &send_data[(size_t)(plan->indices_to_ptr[j]) * (size_t)nbytes],
797 (size_t)(plan->sizes[plan->indices_to[j]]) * (size_t)nbytes);
798 offset +=
799 (size_t)(plan->sizes[plan->indices_to[j]]) * (size_t)nbytes;
800 }
801 j++;
802 }
803 if (plan->sizes_to[i]) {
804 Teuchos::readySend(&send_buff[0], plan->sizes_to[i] * nbytes,
805 plan->procs_to[i], tag, *comm_);
806 }
807 }
808 else
809 self_num = i;
810
811 if (++i == nblocks)
812 i = 0;
813 }
814 if (plan->self_msg) { /* Copy data to self. */
815 if (plan->sizes_to[self_num]) {
816 int j = plan->starts_to[self_num];
817 for (int k = 0; k < plan->lengths_to[self_num]; k++) {
818 int kk = plan->indices_to_ptr[j];
819 char* lrecv = &(plan->getRecvBuff())[self_recv_address];
820 size_t send_idx = (size_t)kk * (size_t)nbytes;
821 const char* lsend = &send_data[send_idx];
822 int sindex = plan->sizes[plan->indices_to[j]], idx;
823 for (idx=0; idx<nbytes; idx++) {
824 memcpy(lrecv, lsend, sindex);
825 lrecv += sindex;
826 lsend += sindex;
827 }
828 self_recv_address += (size_t)(plan->sizes[plan->indices_to[j]])
829 * (size_t) nbytes;
830 j++;
831 }
832 }
833 }
834 }
835 }
836
837 return 0;
838}
839
840int Zoltan2_Directory_Comm::do_wait(
841 Zoltan2_Directory_Plan *plan, /* communication data structure */
842 int /* tag */, /* message tag for communicating */
843 const Teuchos::ArrayRCP<char> &/* send_data */, /* array of data I currently own */
844 int nbytes, /* msg size */
845 Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after comm */
846{
847 /* If not point to point, currently we do synchroneous communications */
848 if (plan->maxed_recvs){
849 /* Do nothing */
850 return 0;
851 }
852
853 int my_proc = plan->comm->getRank(); /* processor ID */
854
855 /* Wait for messages to arrive & unpack them if necessary. */
856 /* Note: since request is in plan, could wait in later routine. */
857 if (plan->indices_from == Teuchos::null) { /* No copying required */
858 if (plan->nrecvs > 0) {
859 Teuchos::waitAll(*comm_, plan->request());
860 }
861 }
862 else { /* Need to copy into recv_data. */
863 int self_num; /* where in send list my_proc appears */
864 size_t offsetDst = 0;
865 if (plan->self_msg) { /* Unpack own data before waiting */
866 for (self_num = 0; self_num < plan->nrecvs + plan->self_msg;
867 self_num++) {
868 if (plan->procs_from[self_num] == my_proc) {
869 break;
870 }
871 }
872
873 if(plan->sizes_from.size()) {
874 // NEW METHOD for variable sized data
875 // This will NOT put the data in order but during the directory read
876 // of the buffer which follows, this data gets sorted anyways since each
877 // element has an index to tell where it belongs. I am not sure yet if
878 // we must sort here, or if it's fine to allow the sort to happen through
879 // the index value. This needs furthe discussion.
880 memcpy(&recv_data[offsetDst * (size_t)nbytes],
881 &(plan->getRecvBuff())[plan->starts_from_ptr[self_num] * (size_t)nbytes],
882 plan->sizes_from[self_num] * (size_t)nbytes);
883 offsetDst += plan->sizes_from[self_num];
884 }
885 else {
886 int k = plan->starts_from[self_num];
887 for (int j = plan->lengths_from[self_num]; j; j--) {
888 memcpy(&recv_data[(size_t)(plan->indices_from[k]) * (size_t)nbytes],
889 &(plan->getRecvBuff())[(size_t)k * (size_t)nbytes], nbytes);
890 k++;
891 }
892 }
893 }
894 else {
895 self_num = plan->nrecvs;
896 }
897
898 for (int jj = 0; jj < plan->nrecvs; jj++) {
899 // TODO: Refactored directory to use Teuchos comm but we have no Teuchos::waitAny
900 // Short term fix is we just call wait() on each request in serial.
901 // When we add Teuchos::waitAny we can replace it for the old version
902 // which is commented out below.
903 plan->request[jj]->wait();
904 int index = jj;
905
906 // Old form with MPI_Waitany
907 // MPI_Status status; /* return from Waitany */
908 // int index;
909 // MPI_Waitany(plan->nrecvs, &plan->request[0], &index, &status);
910
911 if (index >= self_num) {
912 index++;
913 }
914
915 if(plan->sizes_from.size()) {
916 // NEW METHOD for variable sized data
917 // This will NOT put the data in order but during the directory read
918 // of the buffer which follows, this data gets sorted anyways since each
919 // element has an index to tell where it belongs. I am not sure yet if
920 // we must sort here, or if it's fine to allow the sort to happen through
921 // the index value. This needs furthe discussion.
922 memcpy(&recv_data[offsetDst * (size_t)nbytes],
923 &plan->getRecvBuff().getRawPtr()[plan->starts_from_ptr[index] * (size_t)nbytes],
924 plan->sizes_from[index] * (size_t)nbytes);
925 offsetDst += plan->sizes_from[index];
926 }
927 else {
928 int k = plan->starts_from[index];
929 for (int j = plan->lengths_from[index]; j; j--) {
930 memcpy(&recv_data.getRawPtr()[(size_t)(plan->indices_from[k]) * (size_t)nbytes],
931 &plan->getRecvBuff().getRawPtr()[(size_t)k * (size_t)nbytes], nbytes);
932 k++;
933 }
934 }
935 }
936 }
937
938 return 0;
939}
940
941/* Do_Post would require posting more receives than allowed on this platform.
942* We use MPI_AlltoAllv instead, which is probably implemented such that each
943* process does one receive at a time.
944*/
945
946int Zoltan2_Directory_Comm::do_all_to_all(
947 Zoltan2_Directory_Plan *plan, /* communication data structure */
948 const Teuchos::ArrayRCP<char> &send_data, /* array of data I currently own */
949 int nbytes, /* msg size */
950 Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after comm */
951{
952 throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (6)"); // needs unit testing
953
954 int sm = (plan->self_msg > 0) ? 1 : 0;
955
956 int nSendMsgs = plan->nsends + sm;
957 int nRecvMsgs = plan->nrecvs + sm;
958
959 int nSendItems = 0;
960 for (int i=0; i <nSendMsgs; i++) {
961 nSendItems += plan->lengths_to[i];
962 }
963 int nRecvItems = 0;
964 for (int i=0; i <nRecvMsgs; i++) {
965 nRecvItems += plan->lengths_from[i];
966 }
967
968 int nprocs = plan->comm->getSize();
969
970 Teuchos::ArrayRCP<int> outbufCounts(new int[nprocs], 0, nprocs, true);
971 Teuchos::ArrayRCP<int> outbufOffsets(new int[nprocs], 0, nprocs, true);
972 Teuchos::ArrayRCP<int> inbufCounts(new int[nprocs], 0, nprocs, true);
973 Teuchos::ArrayRCP<int> inbufOffsets(new int[nprocs], 0, nprocs, true);
974
975 /* The *_to fields of the plan refer to the items in the send_data buffer,
976 * and how to pull out the correct items for each receiver. The
977 * *_from fields of the plan refer to the recv_data buffer. Items
978 * arrive in process rank order, and these fields tell us where to
979 * put them in the recv_data buffer.
980 */
981
982 /* CREATE SEND BUFFER */
983
984 int sorted = 0;
985 if (plan->indices_to == Teuchos::null){
986 sorted = 1;
987 for (int i=1; i< nSendMsgs; i++){
988 if (plan->starts_to[i] < plan->starts_to[i-1]){
989 sorted = 0;
990 break;
991 }
992 }
993 }
994
995 Teuchos::ArrayRCP<char> outbuf;
996 Teuchos::ArrayRCP<char> inbuf;
997 Teuchos::ArrayRCP<char> buf;
998
999 if (plan->sizes_to.size()){
1000 /*
1001 * Each message contains items for a process, and each item may be
1002 * a different size.
1003 */
1004
1005 int outbufLen = 0;
1006 for (int i = 0; i < nSendMsgs; i++){
1007 outbufLen += plan->sizes_to[i];
1008 }
1009
1010 if (plan->indices_to != Teuchos::null) {
1011 /*
1012 * items are not grouped by message
1013 */
1014 buf.resize(outbufLen*nbytes);
1015 outbuf.resize(outbufLen*nbytes);
1016 char * pBufPtr = &(outbuf[0]);
1017 int i = 0;
1018 int k = 0;
1019 for (int p = 0; p < nprocs; p++) {
1020
1021 int length = 0;
1022
1023 if (i < nSendMsgs){
1024 if (plan->procs_to[i] == p){ /* procs_to is sorted */
1025
1026 for (int j=0; j < plan->lengths_to[i]; j++,k++){
1027 int itemSize = plan->sizes[plan->indices_to[k]] * nbytes;
1028 int offset = plan->indices_to_ptr[k] * nbytes;
1029
1030 memcpy(pBufPtr, &(send_data[0]) + offset, itemSize);
1031
1032 pBufPtr += itemSize;
1033 length += itemSize;
1034 }
1035 i++;
1036 }
1037 }
1038
1039 outbufCounts[p] = length;
1040 if (p){
1041 outbufOffsets[p] = outbufOffsets[p-1] + outbufCounts[p-1];
1042 }
1043 }
1044 }
1045 else{
1046 /*
1047 * items are stored contiguously for each message
1048 */
1049
1050 if (!sorted || (plan->nvals > nSendItems) ){
1051 buf.resize(outbufLen*nbytes);
1052 outbuf.resize(outbufLen*nbytes);
1053 }
1054 else{
1055 /* All items in send_data are being sent, and they are sorted
1056 * in process rank order.
1057 */
1058 // TODO: Optimize - original just set the ptr...
1059 for(int n = 0; n < outbufLen*nbytes; ++n) {
1060 outbuf[n] = send_data[n];
1061 }
1062 }
1063
1064 char * pBufPtr = &(outbuf[0]);
1065
1066 int i = 0;
1067 for (int p = 0; p < nprocs; p++) {
1068
1069 int length = 0;
1070
1071 if (i < nSendMsgs){
1072 if (plan->procs_to[i] == p){ /* procs_to is sorted */
1073 length = plan->sizes_to[i] * nbytes;
1074 int offset = plan->starts_to_ptr[i] * nbytes;
1075
1076 if ((!sorted || (plan->nvals > nSendItems)) && length){
1077 memcpy(pBufPtr, &(send_data[0]) + offset, length);
1078 pBufPtr += length;
1079 }
1080 i++;
1081 }
1082 }
1083
1084 outbufCounts[p] = length;
1085 if (p){
1086 outbufOffsets[p] = outbufOffsets[p-1] + outbufCounts[p-1];
1087 }
1088 }
1089 }
1090 }
1091 else if (plan->indices_to != Teuchos::null) {
1092 /*
1093 * item sizes are constant, however the items belonging in a given
1094 * message may not be contiguous in send_data
1095 */
1096 buf.resize(nSendItems*nbytes);
1097 outbuf.resize(nSendItems*nbytes);
1098 char * pBufPtr = &(outbuf[0]);
1099 int i = 0;
1100 int k = 0;
1101 for (int p = 0; p < nprocs; p++){
1102
1103 int length = 0;
1104
1105 if (i < nSendMsgs){
1106 if (plan->procs_to[i] == p){ /* procs_to is sorted */
1107 for (int j=0; j < plan->lengths_to[i]; j++,k++) {
1108 int offset = plan->indices_to[k] * nbytes;
1109 memcpy(pBufPtr, &(send_data[0]) + offset, nbytes);
1110 pBufPtr += nbytes;
1111 }
1112 length = plan->lengths_to[i] * nbytes;
1113 i++;
1114 }
1115 }
1116
1117 outbufCounts[p] = length;
1118 if (p){
1119 outbufOffsets[p] = outbufOffsets[p-1] + outbufCounts[p-1];
1120 }
1121 }
1122 }
1123 else{
1124
1125 /* item sizes are constant, and items belonging to a
1126 * given message are always stored contiguously in send_data
1127 */
1128
1129 if (!sorted || (plan->nvals > nSendItems)){
1130 buf.resize(nSendItems*nbytes);
1131 outbuf.resize(nSendItems*nbytes);
1132 }
1133 else{
1134 /* send_data is sorted by process, and we don't skip
1135 * any of the data in the buffer, so we can use send_data
1136 * in the alltoall call
1137 */
1138 // TODO: Optimize - original just set ptr
1139 outbuf = send_data;
1140 }
1141
1142 char * pBufPtr = &(outbuf[0]);
1143
1144 int i = 0;
1145 for (int p=0; p < nprocs; p++) {
1146
1147 int length = 0;
1148
1149 if (i < nSendMsgs){
1150 if (plan->procs_to[i] == p){ /* procs_to is sorted */
1151 int offset = plan->starts_to[i] * nbytes;
1152 length = plan->lengths_to[i] * nbytes;
1153
1154 if ((!sorted || (plan->nvals > nSendItems)) && length){
1155 memcpy(pBufPtr, &(send_data[0]) + offset, length);
1156 pBufPtr += length;
1157 }
1158 i++;
1159 }
1160 }
1161
1162 outbufCounts[p] = length;
1163 if (p){
1164 outbufOffsets[p] = outbufOffsets[p-1] + outbufCounts[p-1];
1165 }
1166 }
1167 }
1168
1169 /* CREATE RECEIVE BUFFER */
1170
1171 sorted = 0;
1172 int i;
1173 if (plan->indices_from == Teuchos::null) {
1174 sorted = 1;
1175 for (i=1; i< nRecvMsgs; i++) {
1176 if (plan->starts_from[i] < plan->starts_from[i-1]){
1177 sorted = 0;
1178 break;
1179 }
1180 }
1181 }
1182
1183 if (sorted){
1184 /* Caller already expects received data to be ordered by
1185 * the sending process rank.
1186 */
1187
1188 // TODO: Optimize - original just set ptr
1189 outbuf = send_data;
1190 inbuf = recv_data;
1191 }
1192 else {
1193 inbuf.resize(plan->total_recv_size * nbytes);
1194 }
1195
1196 for (int p = 0; p < nprocs; p++) {
1197 int length = 0;
1198
1199 if (i < nRecvMsgs){
1200 if (plan->procs_from[i] == p){
1201
1202 if (!plan->using_sizes){
1203 length = plan->lengths_from[i] * nbytes;
1204 }
1205 else{
1206 length = plan->sizes_from[i] * nbytes;
1207 }
1208 i++;
1209 }
1210 }
1211
1212 inbufCounts[p] = length;
1213 if (p){
1214 inbufOffsets[p] = inbufOffsets[p-1] + inbufCounts[p-1];
1215 }
1216 }
1217
1218 /* EXCHANGE DATA */
1219 // Did not refactor this - need Teuchos form but not tested code
1220 // MPI_Alltoall(&(sendbuf[0]), 1, MPI_INT, &(recvbuf[0]), 1, MPI_INT,
1221 // getRawComm());
1222
1223 /* WRITE RECEIVED DATA INTO USER'S BUFFER WHERE IT'S EXPECTED */
1224
1225 if (!sorted){
1226
1227 char * pBufPtr = &(inbuf[0]);
1228
1229 if (!plan->using_sizes){
1230
1231 /* each item in each message is nbytes long */
1232
1233 if (plan->indices_from == Teuchos::null) {
1234 for (i=0; i < nRecvMsgs; i++){
1235 int offset = plan->starts_from[i] * nbytes;
1236 int length = plan->lengths_from[i] * nbytes;
1237 memcpy(&(recv_data[0]) + offset, pBufPtr, length);
1238 pBufPtr += length;
1239 }
1240 }
1241 else{
1242 int k = 0;
1243 for (i=0; i < nRecvMsgs; i++) {
1244
1245 for (int j=0; j < plan->lengths_from[i]; j++,k++){
1246 int offset = plan->indices_from[k] * nbytes;
1247 memcpy(&(recv_data[0]) + offset, pBufPtr, nbytes);
1248 pBufPtr += nbytes;
1249 }
1250 }
1251 }
1252 }
1253 else{ /* (sizes!=NULL) && (indices_from!=NULL) not allowed by Zoltan_Comm_Resize */
1254
1255 /* items can be different sizes */
1256
1257 for (i=0; i < nRecvMsgs; i++){
1258 int offset = plan->starts_from_ptr[i] * nbytes;
1259 int length = plan->sizes_from[i] * nbytes;
1260 memcpy(&(recv_data[0]) + offset, pBufPtr, length);
1261 pBufPtr += length;
1262 }
1263 }
1264 }
1265
1266 return 0;
1267}
1268
1270 int tag, /* message tag for communicating */
1271 const Teuchos::ArrayRCP<char> &send_data, /* array of data I currently own */
1272 int nbytes, /* msg size */
1273 const Teuchos::ArrayRCP<int> &sizes,
1274 Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after reverse comm */
1275{
1276 /* create plan->plan_reverse
1277 */
1278 int status = create_reverse_plan(tag, sizes);
1279
1280 // NEW METHOD
1281 // Set up recv_data with the proper size
1282 // This information is only available after the above create_reverse_plan is
1283 // called so we can setup the return data with proper buffer size now.
1284 // However should we do this here?
1285 size_t new_size = plan_forward->plan_reverse->total_recv_size*nbytes;
1286 if(new_size > 0) {
1287 // have to be careful with new[0] which will for example turn a
1288 // arrayRCP.getRawPtr() from a valid NULL read to a debug assert fail.
1289 recv_data = Teuchos::arcp(new char[new_size], 0, new_size, true);
1290 }
1291
1292 if (status == 0) {
1293
1294 if (plan_forward->plan_reverse->maxed_recvs) {
1295
1296 throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (7)"); // needs unit testing
1297
1298 /* use MPI_Alltoallv to implement plan->plan_reverse, because comm_do_post
1299 * would post more receives that allowed on this machine
1300 */
1301
1302 status = do_all_to_all(plan_forward->plan_reverse, send_data,
1303 nbytes, recv_data);
1304 }
1305 else {
1306 /* use post/wait which is faster when each sends to few
1307 */
1308 status = do_post(plan_forward->plan_reverse, tag, send_data,
1309 nbytes, recv_data);
1310
1311 if (status == 0) {
1312 status = do_wait (plan_forward->plan_reverse, tag, send_data,
1313 nbytes, recv_data);
1314 }
1315 }
1316 }
1317
1318 free_reverse_plan(plan_forward);
1319
1320 return status;
1321}
1322
1323void Zoltan2_Directory_Comm::free_reverse_plan(Zoltan2_Directory_Plan *plan)
1324{
1325 if(!plan) {
1326 throw std::logic_error("Plan is NULL!");
1327 }
1328 delete plan->plan_reverse;
1329 plan->plan_reverse = NULL;
1330}
1331
1332int Zoltan2_Directory_Comm::create_reverse_plan(
1333 int tag,
1334 const Teuchos::ArrayRCP<int> &sizes)/* variable size of objects (if not size 0) */
1335{
1336 /* Check input parameters */
1337 if (!plan_forward){
1338 throw std::logic_error("memory error");
1339 }
1340
1341 /* Let Zoltan_Comm_Do check the remaining parameters. */
1342 plan_forward->plan_reverse = new Zoltan2_Directory_Plan;
1343 plan_forward->plan_reverse->getInvertedValues(plan_forward);
1344
1345 if (MPI_RECV_LIMIT > 0){
1346 /* If we have a limit to the number of posted receives we are allowed,
1347 ** and our plan has exceeded that, then switch to an MPI_Alltoallv so
1348 ** that we will have fewer receives posted when we do the communication.
1349 */
1350 int global_nsends;
1351 Teuchos::reduceAll<int>(*plan_forward->comm, Teuchos::REDUCE_SUM, 1,
1352 &plan_forward->nsends, &global_nsends);
1353 if (global_nsends > MPI_RECV_LIMIT){
1354 plan_forward->plan_reverse->maxed_recvs = 1;
1355 }
1356 }
1357
1358 if (plan_forward->plan_reverse->maxed_recvs == 0) {
1359 // See notes in header for MPI_Request
1360 // plan_forward->plan_reverse->request = Teuchos::arcp(new MPI_Request[plan_forward->plan_reverse->nrecvs], 0, plan_forward->plan_reverse->nrecvs, true);
1361 // plan_forward->plan_reverse->status = Teuchos::arcp(new MPI_Status[plan_forward->plan_reverse->nrecvs], 0, plan_forward->plan_reverse->nrecvs, true);
1362 plan_forward->plan_reverse->request.resize(plan_forward->plan_reverse->nrecvs);
1363 }
1364
1365 int sum_recv_sizes;
1366 int comm_flag = resize( plan_forward->plan_reverse,
1367 sizes, tag, &sum_recv_sizes);
1368
1369 if (comm_flag != 0) {
1370 return(comm_flag);
1371 }
1372
1373 if (sum_recv_sizes != plan_forward->plan_reverse->total_recv_size){
1374 /* Sanity check */
1375 return 1;
1376 }
1377
1378 return 0;
1379}
1380
1382 const Teuchos::ArrayRCP<int> &sizes, /* size of each item I'm sending */
1383 int tag, /* message tag I can use */
1384 int *sum_recv_sizes) /* sum of the sizes of the items I'll receive */
1385{
1386 return resize(plan_forward, sizes, tag, sum_recv_sizes);
1387}
1388
1390 Zoltan2_Directory_Plan *plan, /* communication plan object */
1391 const Teuchos::ArrayRCP<int> &sizes, /* size of each item I'm sending */
1392 int tag, /* message tag I can use */
1393 int *sum_recv_sizes) /* sum of the sizes of the items I'll receive */
1394{
1395 /* If sizes vary, then I need to compute and communicate message lengths */
1396 /* First check if sizes array is NULL on all procs. */
1397 int my_proc = plan->comm->getRank(); /* my processor ID */
1398 int has_sizes = (sizes.size() != 0);
1399 int var_sizes; /* items have variable sizes? */
1400
1401 Teuchos::reduceAll(*comm_, Teuchos::REDUCE_BOR, 1, &has_sizes, &var_sizes);
1402
1403 if (var_sizes && plan->indices_from != Teuchos::null) {
1404 // NEW METHOD
1405 // Allow this to run now - the below implementation is working but perhaps
1406 // not done correctly for other usage cases I have not considered yet.
1407 // throw std::logic_error("Non-blocked, variable-sized recvs not supported");
1408 }
1409
1410 int nsends = plan->nsends; /* number of msgs I'll send */
1411 int nrecvs = plan->nrecvs; /* number of msgs I'll recv */
1412 int self_msg = plan->self_msg;
1413
1414 Teuchos::ArrayRCP<int> sizes_to;
1415 Teuchos::ArrayRCP<int> sizes_from;
1416 Teuchos::ArrayRCP<int> starts_to_ptr;
1417 Teuchos::ArrayRCP<int> starts_from_ptr;
1418 Teuchos::ArrayRCP<int> indices_to_ptr;
1419 Teuchos::ArrayRCP<int> indices_from_ptr;
1420
1421 if (!var_sizes) { /* Easy case. Size = length */
1422 plan->total_recv_size = 0;
1423 for (int i = 0; i < nrecvs + self_msg; i++) {
1424 plan->total_recv_size += plan->lengths_from[i];
1425 }
1426
1427 plan->max_send_size = 0;
1428 for (int i = 0; i < nsends + self_msg; i++) {
1429 if (plan->procs_to[i] != my_proc &&
1430 plan->lengths_to[i] > plan->max_send_size) {
1431 plan->max_send_size = plan->lengths_to[i];
1432 }
1433 }
1434 }
1435 else { /* Need to actually compute message sizes */
1436
1437 // TODO Investigate purpose of the +1 in the old code. Is that used?
1438
1439 // OLD CODE
1440 // plan->sizes.resize(plan->nvals + 1);
1441 // for (int i = 0; i < plan->nvals; i++) {
1442 // plan->sizes[i] = sizes[i];
1443 // }
1444
1445 // NEW CODE
1446 plan->sizes = sizes; // can we just copy?
1447 plan->using_sizes = true;
1448
1449 if(nsends + self_msg > 0) {
1450 sizes_to = Teuchos::arcp(
1451 new int[nsends + self_msg], 0, nsends + self_msg, true);
1452 for(int n = 0; n < sizes_to.size(); ++n) {
1453 sizes_to[n] = 0;
1454 }
1455 }
1456 if(nrecvs + self_msg > 0) {
1457 sizes_from = Teuchos::arcp(
1458 new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1459 }
1460
1461 /* Several cases:
1462 1. indices_to == NULL
1463 => starts_to != NULL, need to allocate, set starts_to_ptr
1464 2. indices_to != NULL (=> starts_to == NULL)
1465 need to allocate, set indices_to_ptr
1466 3,4. mirror cases for _from
1467 */
1468 if(nsends + self_msg > 0) {
1469 starts_to_ptr = Teuchos::arcp(
1470 new int[nsends + self_msg], 0, nsends + self_msg, true);
1471 }
1472 if (plan->indices_to == Teuchos::null) {
1473 /* Simpler case; sends already blocked by processor */
1474 Teuchos::ArrayRCP<int> index;
1475 Teuchos::ArrayRCP<int> sort_val;
1476 if(nsends + self_msg > 0) {
1477 index = Teuchos::arcp(
1478 new int[nsends + self_msg], 0, nsends + self_msg, true);
1479 sort_val = Teuchos::arcp(
1480 new int[nsends + self_msg], 0, nsends + self_msg, true);
1481 }
1482 for (int i = 0; i < nsends + self_msg; i++) {
1483 int j = plan->starts_to[i];
1484
1485 for (int k = 0; k < plan->lengths_to[i]; k++) {
1486 sizes_to[i] += sizes[j++];
1487 }
1488 if (sizes_to[i] > plan->max_send_size &&
1489 plan->procs_to[i] != my_proc)
1490 plan->max_send_size = sizes_to[i];
1491 }
1492 for (int i = 0; i < nsends + self_msg; i++) {
1493 sort_val[i] = plan->starts_to[i];
1494 index[i] = i;
1495 }
1496 sort_ints(sort_val, index);
1497 int sum = 0;
1498 for (int i = 0; i < nsends + self_msg; i++) {
1499 starts_to_ptr[index[i]] = sum;
1500 sum += sizes_to[index[i]];
1501 }
1502 }
1503 else { /* Harder case, sends not blocked */
1504 Teuchos::ArrayRCP<int> offset;
1505 if(plan->nvals > 0) {
1506 offset = Teuchos::arcp(new int[plan->nvals], 0, plan->nvals, true);
1507 }
1508 indices_to_ptr.resize(plan->nvals);
1509
1510 /* Compute address for every item in send array */
1511 int sum = 0;
1512 for (int i = 0; i < plan->nvals; i++) {
1513 offset[i] = sum;
1514 sum += sizes[i];
1515 }
1516
1517 sum = 0;
1518 plan->max_send_size = 0;
1519 for (int i = 0; i < nsends + self_msg; i++) {
1520 starts_to_ptr[i] = sum;
1521 int j = plan->starts_to[i];
1522 for (int k = 0; k < plan->lengths_to[i]; k++) {
1523 indices_to_ptr[j] = offset[plan->indices_to[j]];
1524 sizes_to[i] += sizes[plan->indices_to[j++]];
1525 }
1526 if (sizes_to[i] > plan->max_send_size &&
1527 plan->procs_to[i] != my_proc)
1528 plan->max_send_size = sizes_to[i];
1529 sum += sizes_to[i];
1530 }
1531 }
1532
1533 /* Note: This routine only gets message sizes, not object sizes. */
1534 /* Anything requiring item sizes requires more code */
1535 /* Should such functionality reside here? */
1536 exchange_sizes(sizes_to, plan->procs_to, nsends, self_msg,
1537 sizes_from, plan->procs_from, nrecvs,
1538 &plan->total_recv_size, my_proc, tag, plan->comm);
1539
1540 if(nrecvs + self_msg > 0) {
1541 starts_from_ptr = Teuchos::arcp(
1542 new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1543 }
1544
1545 if (plan->indices_from == Teuchos::null) {
1546 /* Simpler case; recvs already blocked by processor */
1547 Teuchos::ArrayRCP<int> index;
1548 Teuchos::ArrayRCP<int> sort_val;
1549 if(nrecvs + self_msg > 0) {
1550 index = Teuchos::arcp(
1551 new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1552 sort_val = Teuchos::arcp<int>(
1553 new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1554 }
1555
1556 for (int i = 0; i < nrecvs + self_msg; i++) {
1557 sort_val[i] = plan->starts_from[i];
1558 index[i] = i;
1559 }
1560 sort_ints(sort_val, index);
1561
1562 int sum = 0;
1563 for (int i = 0; i < nrecvs + self_msg; i++) {
1564 starts_from_ptr[index[i]] = sum;
1565 sum += sizes_from[index[i]];
1566 }
1567 }
1568 else {
1569 // OLD COMMENT left here for reference but to be deleted
1570 /* Harder case, recvs not blocked */
1571 /* Not currently supported */
1572 /* Can't do w/o individual item sizes */
1573 /* Currently checked for at top of file */
1574
1575 // NEW METHOD
1576 // Note this is currently just a duplicate of above block which is working
1577 // since I've set up do_wait to just copy the entire block. However I am
1578 // not sure yet how to organize this implementation and suspect this may
1579 // not be correct anyways - though it seems to work. After do_wait copies
1580 // the data (out of order) the directory will handle resorting it since
1581 // each element has index to indicate where it goes in the array. In the
1582 // original do_wait implementation it seemed it was setup to place the
1583 // elemenets in order, even though they would be sorted later, so that
1584 // part I need to discuss further
1585 Teuchos::ArrayRCP<int> index;
1586 Teuchos::ArrayRCP<int> sort_val;
1587 if(nrecvs + self_msg > 0) {
1588 index = Teuchos::arcp(
1589 new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1590 sort_val = Teuchos::arcp(
1591 new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1592 }
1593
1594 for (int i = 0; i < nrecvs + self_msg; i++) {
1595 sort_val[i] = plan->starts_from[i];
1596 index[i] = i;
1597 }
1598 sort_ints(sort_val, index);
1599
1600 int sum = 0;
1601 for (int i = 0; i < nrecvs + self_msg; i++) {
1602 starts_from_ptr[index[i]] = sum;
1603 sum += sizes_from[index[i]];
1604 }
1605 }
1606 }
1607 plan->sizes_to = sizes_to;
1608 plan->sizes_from = sizes_from;
1609 plan->starts_to_ptr = starts_to_ptr;
1610 plan->starts_from_ptr = starts_from_ptr;
1611 plan->indices_to_ptr = indices_to_ptr;
1612 plan->indices_from_ptr = indices_from_ptr;
1613
1614 if (sum_recv_sizes) {
1615 *sum_recv_sizes = plan->total_recv_size;
1616 }
1617
1618 return 0;
1619}
1620
1621int Zoltan2_Directory_Comm::exchange_sizes(
1622 const Teuchos::ArrayRCP<int> &sizes_to, /* value I need to exchange (size of true msg) */
1623 const Teuchos::ArrayRCP<int> &procs_to, /* procs I send to */
1624 int nsends, /* number of messages I'll send */
1625 int self_msg, /* do I copy data to myself? */
1626 Teuchos::ArrayRCP<int> &sizes_from, /* (returned) size of all my receives */
1627 const Teuchos::ArrayRCP<int> &procs_from, /* procs I recv from */
1628 int nrecvs, /* number of messages I receive */
1629 int *total_recv_size, /* (returned) sum of all incoming sizes */
1630 int my_proc, /* my processor number */
1631 int tag, /* message tag I can use */
1632 Teuchos::RCP<const Teuchos::Comm<int> > /* comm */) { /* communicator */
1633
1634 /* If sizes vary, then I need to communicate messaaage lengths */
1635 int self_index_to = -1; /* location of self in procs_to */
1636 for (int i = 0; i < nsends + self_msg; i++) {
1637 if (procs_to[i] != my_proc) {
1638#ifdef HAVE_MPI // Teuchos::send not implemented for Serial - Serial is just for debugging
1639 Teuchos::send(*comm_, 1, &sizes_to[i], procs_to[i]);
1640#endif
1641 }
1642 else {
1643 self_index_to = i;
1644 }
1645 }
1646
1647 *total_recv_size = 0;
1648
1649 for (int i = 0; i < nrecvs + self_msg; i++) {
1650 if (procs_from[i] != my_proc) {
1651#ifdef HAVE_MPI // Teuchos::receive not implemented for Serial - Serial is just for debugging
1652 Teuchos::receive(*comm_, procs_from[i], 1, &sizes_from[i]);
1653#endif
1654 }
1655 else {
1656 sizes_from[i] = sizes_to[self_index_to];
1657 }
1658 *total_recv_size += sizes_from[i];
1659 }
1660 return 0;
1661}
1662
1663} // end namespace Zoltan2
#define PRINT_VECTOR(v)
#define PRINT_VAL(val)
#define MPI_RECV_LIMIT
int do_reverse(int tag, const Teuchos::ArrayRCP< char > &send_data, int nbytes, const Teuchos::ArrayRCP< int > &sizes, Teuchos::ArrayRCP< char > &recv_data)
Zoltan2_Directory_Comm(int nvals, const Teuchos::ArrayRCP< int > &assign, Teuchos::RCP< const Teuchos::Comm< int > > comm, int tag)
int do_forward(int tag, const Teuchos::ArrayRCP< char > &send_data, int nbytes, Teuchos::ArrayRCP< char > &recv_data)
int resize(const Teuchos::ArrayRCP< int > &sizes, int tag, int *sum_recv_sizes)
void getInvertedValues(Zoltan2_Directory_Plan *from)
void print(const std::string &headerMessage) const
Teuchos::ArrayRCP< Teuchos::RCP< Teuchos::CommRequest< int > > > request
Teuchos::RCP< const Teuchos::Comm< int > > comm
Created by mbenlioglu on Aug 31, 2020.