Tpetra parallel linear algebra  Version of the Day
Tpetra_Details_DistributorPlan.cpp
1 // ***********************************************************************
2 //
3 // Tpetra: Templated Linear Algebra Services Package
4 // Copyright (2008) Sandia Corporation
5 //
6 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
7 // the U.S. Government retains certain rights in this software.
8 //
9 // Redistribution and use in source and binary forms, with or without
10 // modification, are permitted provided that the following conditions are
11 // met:
12 //
13 // 1. Redistributions of source code must retain the above copyright
14 // notice, this list of conditions and the following disclaimer.
15 //
16 // 2. Redistributions in binary form must reproduce the above copyright
17 // notice, this list of conditions and the following disclaimer in the
18 // documentation and/or other materials provided with the distribution.
19 //
20 // 3. Neither the name of the Corporation nor the names of the
21 // contributors may be used to endorse or promote products derived from
22 // this software without specific prior written permission.
23 //
24 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
25 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
27 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
28 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
29 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
30 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
31 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
32 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
33 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
34 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 //
36 // ************************************************************************
37 // @HEADER
38 
39 #include "Tpetra_Details_DistributorPlan.hpp"
40 
41 #include "Teuchos_StandardParameterEntryValidators.hpp"
42 #include "Tpetra_Util.hpp"
43 #include <numeric>
44 
45 namespace Tpetra {
46 namespace Details {
47 
48 std::string
50 {
51  if (sendType == DISTRIBUTOR_ISEND) {
52  return "Isend";
53  }
54  else if (sendType == DISTRIBUTOR_RSEND) {
55  return "Rsend";
56  }
57  else if (sendType == DISTRIBUTOR_SEND) {
58  return "Send";
59  }
60  else if (sendType == DISTRIBUTOR_SSEND) {
61  return "Ssend";
62  }
63  else {
64  TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid "
65  "EDistributorSendType enum value " << sendType << ".");
66  }
67 }
68 
69 std::string
71 {
72  switch (how) {
73  case Details::DISTRIBUTOR_NOT_INITIALIZED:
74  return "Not initialized yet";
75  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
76  return "By createFromSends";
77  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
78  return "By createFromRecvs";
79  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS:
80  return "By createFromSendsAndRecvs";
81  case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
82  return "By createReverseDistributor";
83  case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
84  return "By copy constructor";
85  default:
86  return "INVALID";
87  }
88 }
89 
90 DistributorPlan::DistributorPlan(Teuchos::RCP<const Teuchos::Comm<int>> comm)
91  : comm_(comm),
92  howInitialized_(DISTRIBUTOR_NOT_INITIALIZED),
93  reversePlan_(Teuchos::null),
94  sendType_(DISTRIBUTOR_SEND),
95  barrierBetweenRecvSend_(barrierBetween_default),
96  useDistinctTags_(useDistinctTags_default),
97  sendMessageToSelf_(false),
98  numSendsToOtherProcs_(0),
99  maxSendLength_(0),
100  numReceives_(0),
101  totalReceiveLength_(0)
102 { }
103 
104 DistributorPlan::DistributorPlan(const DistributorPlan& otherPlan)
105  : comm_(otherPlan.comm_),
106  howInitialized_(DISTRIBUTOR_INITIALIZED_BY_COPY),
107  reversePlan_(otherPlan.reversePlan_),
108  sendType_(otherPlan.sendType_),
109  barrierBetweenRecvSend_(otherPlan.barrierBetweenRecvSend_),
110  useDistinctTags_(otherPlan.useDistinctTags_),
111  sendMessageToSelf_(otherPlan.sendMessageToSelf_),
112  numSendsToOtherProcs_(otherPlan.numSendsToOtherProcs_),
113  procIdsToSendTo_(otherPlan.procIdsToSendTo_),
114  startsTo_(otherPlan.startsTo_),
115  lengthsTo_(otherPlan.lengthsTo_),
116  maxSendLength_(otherPlan.maxSendLength_),
117  indicesTo_(otherPlan.indicesTo_),
118  numReceives_(otherPlan.numReceives_),
119  totalReceiveLength_(otherPlan.totalReceiveLength_),
120  lengthsFrom_(otherPlan.lengthsFrom_),
121  procsFrom_(otherPlan.procsFrom_),
122  startsFrom_(otherPlan.startsFrom_),
123  indicesFrom_(otherPlan.indicesFrom_)
124 { }
125 
126 int DistributorPlan::getTag(const int pathTag) const {
127  return useDistinctTags_ ? pathTag : comm_->getTag();
128 }
129 
130 size_t DistributorPlan::createFromSends(const Teuchos::ArrayView<const int>& exportProcIDs) {
131  using Teuchos::outArg;
132  using Teuchos::REDUCE_MAX;
133  using Teuchos::reduceAll;
134  using std::endl;
135 
136  const size_t numExports = exportProcIDs.size();
137  const int myProcID = comm_->getRank();
138  const int numProcs = comm_->getSize();
139 
140  // exportProcIDs tells us the communication pattern for this
141  // distributor. It dictates the way that the export data will be
142  // interpreted in doPosts(). We want to perform at most one
143  // send per process in doPosts; this is for two reasons:
144  // * minimize latency / overhead in the comm routines (nice)
145  // * match the number of receives and sends between processes
146  // (necessary)
147  //
148  // Teuchos::Comm requires that the data for a send are contiguous
149  // in a send buffer. Therefore, if the data in the send buffer
150  // for doPosts() are not contiguous, they will need to be copied
151  // into a contiguous buffer. The user has specified this
152  // noncontiguous pattern and we can't do anything about it.
153  // However, if they do not provide an efficient pattern, we will
154  // warn them if one of the following compile-time options has been
155  // set:
156  // * HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS
157  // * HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS
158  //
159  // If the data are contiguous, then we can post the sends in situ
160  // (i.e., without needing to copy them into a send buffer).
161  //
162  // Determine contiguity. There are a number of ways to do this:
163  // * If the export IDs are sorted, then all exports to a
164  // particular proc must be contiguous. This is what Epetra does.
165  // * If the export ID of the current export already has been
166  // listed, then the previous listing should correspond to the
167  // same export. This tests contiguity, but not sortedness.
168  //
169  // Both of these tests require O(n), where n is the number of
170  // exports. However, the latter will positively identify a greater
171  // portion of contiguous patterns. We use the latter method.
172  //
173  // Check to see if values are grouped by procs without gaps
174  // If so, indices_to -> 0.
175 
176  // Set up data structures for quick traversal of arrays.
177  // This contains the number of sends for each process ID.
178  //
179  // FIXME (mfh 20 Mar 2014) This is one of a few places in Tpetra
180  // that create an array of length the number of processes in the
181  // communicator (plus one). Given how this code uses this array,
182  // it should be straightforward to replace it with a hash table or
183  // some other more space-efficient data structure. In practice,
184  // most of the entries of starts should be zero for a sufficiently
185  // large process count, unless the communication pattern is dense.
186  // Note that it's important to be able to iterate through keys (i
187  // for which starts[i] is nonzero) in increasing order.
188  Teuchos::Array<size_t> starts (numProcs + 1, 0);
189 
190  // numActive is the number of sends that are not Null
191  size_t numActive = 0;
192  int needSendBuff = 0; // Boolean
193 
194  for (size_t i = 0; i < numExports; ++i) {
195  const int exportID = exportProcIDs[i];
196  if (exportID >= 0) {
197  // exportID is a valid process ID. Increment the number of
198  // messages this process will send to that process.
199  ++starts[exportID];
200 
201  // If we're sending more than one message to process exportID,
202  // then it is possible that the data are not contiguous.
203  // Check by seeing if the previous process ID in the list
204  // (exportProcIDs[i-1]) is the same. It's safe to use i-1,
205  // because if starts[exportID] > 1, then i must be > 1 (since
206  // the starts array was filled with zeros initially).
207 
208  // null entries break continuity.
209  // e.g., [ 0, 0, 0, 1, -99, 1, 2, 2, 2] is not contiguous
210  if (needSendBuff == 0 && starts[exportID] > 1 &&
211  exportID != exportProcIDs[i-1]) {
212  needSendBuff = 1;
213  }
214  ++numActive;
215  }
216  }
217 
218 #if defined(HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS) || defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
219  {
220  int global_needSendBuff;
221  reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
222  outArg (global_needSendBuff));
224  global_needSendBuff != 0, std::runtime_error,
225  "::createFromSends: Grouping export IDs together by process rank often "
226  "improves performance.");
227  }
228 #endif
229 
230  // Determine from the caller's data whether or not the current
231  // process should send (a) message(s) to itself.
232  if (starts[myProcID] != 0) {
233  sendMessageToSelf_ = true;
234  }
235  else {
236  sendMessageToSelf_ = false;
237  }
238 
239  if (! needSendBuff) {
240  // grouped by proc, no send buffer or indicesTo_ needed
241  numSendsToOtherProcs_ = 0;
242  // Count total number of sends, i.e., total number of procs to
243  // which we are sending. This includes myself, if applicable.
244  for (int i = 0; i < numProcs; ++i) {
245  if (starts[i]) {
246  ++numSendsToOtherProcs_;
247  }
248  }
249 
250  // Not only do we not need these, but we must clear them, as
251  // empty status of indicesTo is a flag used later.
252  indicesTo_.resize(0);
253  // Size these to numSendsToOtherProcs_; note, at the moment, numSendsToOtherProcs_
254  // includes self sends. Set their values to zeros.
255  procIdsToSendTo_.assign(numSendsToOtherProcs_,0);
256  startsTo_.assign(numSendsToOtherProcs_,0);
257  lengthsTo_.assign(numSendsToOtherProcs_,0);
258 
259  // set startsTo to the offset for each send (i.e., each proc ID)
260  // set procsTo to the proc ID for each send
261  // in interpreting this code, remember that we are assuming contiguity
262  // that is why index skips through the ranks
263  {
264  size_t index = 0, procIndex = 0;
265  for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
266  while (exportProcIDs[procIndex] < 0) {
267  ++procIndex; // skip all negative proc IDs
268  }
269  startsTo_[i] = procIndex;
270  int procID = exportProcIDs[procIndex];
271  procIdsToSendTo_[i] = procID;
272  index += starts[procID];
273  procIndex += starts[procID];
274  }
275  }
276  // sort the startsTo and proc IDs together, in ascending order, according
277  // to proc IDs
278  if (numSendsToOtherProcs_ > 0) {
279  sort2(procIdsToSendTo_.begin(), procIdsToSendTo_.end(), startsTo_.begin());
280  }
281  // compute the maximum send length
282  maxSendLength_ = 0;
283  for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
284  int procID = procIdsToSendTo_[i];
285  lengthsTo_[i] = starts[procID];
286  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
287  maxSendLength_ = lengthsTo_[i];
288  }
289  }
290  }
291  else {
292  // not grouped by proc, need send buffer and indicesTo_
293 
294  // starts[i] is the number of sends to proc i
295  // numActive equals number of sends total, \sum_i starts[i]
296 
297  // this loop starts at starts[1], so explicitly check starts[0]
298  if (starts[0] == 0 ) {
299  numSendsToOtherProcs_ = 0;
300  }
301  else {
302  numSendsToOtherProcs_ = 1;
303  }
304  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
305  im1=starts.begin();
306  i != starts.end(); ++i)
307  {
308  if (*i != 0) ++numSendsToOtherProcs_;
309  *i += *im1;
310  im1 = i;
311  }
312  // starts[i] now contains the number of exports to procs 0 through i
313 
314  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
315  i=starts.rbegin()+1;
316  i != starts.rend(); ++i)
317  {
318  *ip1 = *i;
319  ip1 = i;
320  }
321  starts[0] = 0;
322  // starts[i] now contains the number of exports to procs 0 through
323  // i-1, i.e., all procs before proc i
324 
325  indicesTo_.resize(numActive);
326 
327  for (size_t i = 0; i < numExports; ++i) {
328  if (exportProcIDs[i] >= 0) {
329  // record the offset to the sendBuffer for this export
330  indicesTo_[starts[exportProcIDs[i]]] = i;
331  // now increment the offset for this proc
332  ++starts[exportProcIDs[i]];
333  }
334  }
335  // our send buffer will contain the export data for each of the procs
336  // we communicate with, in order by proc id
337  // sendBuffer = {proc_0_data, proc_1_data, ..., proc_np-1_data}
338  // indicesTo now maps each export to the location in our send buffer
339  // associated with the export
340  // data for export i located at sendBuffer[indicesTo[i]]
341  //
342  // starts[i] once again contains the number of exports to
343  // procs 0 through i
344  for (int proc = numProcs-1; proc != 0; --proc) {
345  starts[proc] = starts[proc-1];
346  }
347  starts.front() = 0;
348  starts[numProcs] = numActive;
349  //
350  // starts[proc] once again contains the number of exports to
351  // procs 0 through proc-1
352  // i.e., the start of my data in the sendBuffer
353 
354  // this contains invalid data at procs we don't care about, that is okay
355  procIdsToSendTo_.resize(numSendsToOtherProcs_);
356  startsTo_.resize(numSendsToOtherProcs_);
357  lengthsTo_.resize(numSendsToOtherProcs_);
358 
359  // for each group of sends/exports, record the destination proc,
360  // the length, and the offset for this send into the
361  // send buffer (startsTo_)
362  maxSendLength_ = 0;
363  size_t snd = 0;
364  for (int proc = 0; proc < numProcs; ++proc ) {
365  if (starts[proc+1] != starts[proc]) {
366  lengthsTo_[snd] = starts[proc+1] - starts[proc];
367  startsTo_[snd] = starts[proc];
368  // record max length for all off-proc sends
369  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
370  maxSendLength_ = lengthsTo_[snd];
371  }
372  procIdsToSendTo_[snd] = proc;
373  ++snd;
374  }
375  }
376  }
377 
378  if (sendMessageToSelf_) {
379  --numSendsToOtherProcs_;
380  }
381 
382  // Invert map to see what msgs are received and what length
383  computeReceives();
384 
385  // createFromRecvs() calls createFromSends(), but will set
386  // howInitialized_ again after calling createFromSends().
387  howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
388 
389  return totalReceiveLength_;
390 }
391 
392 void DistributorPlan::createFromRecvs(const Teuchos::ArrayView<const int>& remoteProcIDs)
393 {
394  createFromSends(remoteProcIDs);
395 
396  *this = *getReversePlan();
397 
398  howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS;
399 }
400 
401 void DistributorPlan::createFromSendsAndRecvs(const Teuchos::ArrayView<const int>& exportProcIDs,
402  const Teuchos::ArrayView<const int>& remoteProcIDs)
403 {
404  // note the exportProcIDs and remoteProcIDs _must_ be a list that has
405  // an entry for each GID. If the export/remoteProcIDs is taken from
406  // the getProcs{From|To} lists that are extracted from a previous distributor,
407  // it will generate a wrong answer, because those lists have a unique entry
408  // for each processor id. A version of this with lengthsTo and lengthsFrom
409  // should be made.
410 
411  howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
412 
413 
414  int myProcID = comm_->getRank ();
415  int numProcs = comm_->getSize();
416 
417  const size_t numExportIDs = exportProcIDs.size();
418  Teuchos::Array<size_t> starts (numProcs + 1, 0);
419 
420  size_t numActive = 0;
421  int needSendBuff = 0; // Boolean
422 
423  for(size_t i = 0; i < numExportIDs; i++ )
424  {
425  if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
426  needSendBuff = 1;
427  if( exportProcIDs[i] >= 0 )
428  {
429  ++starts[ exportProcIDs[i] ];
430  ++numActive;
431  }
432  }
433 
434  sendMessageToSelf_ = ( starts[myProcID] != 0 ) ? 1 : 0;
435 
436  numSendsToOtherProcs_ = 0;
437 
438  if( needSendBuff ) //grouped by processor, no send buffer or indicesTo_ needed
439  {
440  if (starts[0] == 0 ) {
441  numSendsToOtherProcs_ = 0;
442  }
443  else {
444  numSendsToOtherProcs_ = 1;
445  }
446  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
447  im1=starts.begin();
448  i != starts.end(); ++i)
449  {
450  if (*i != 0) ++numSendsToOtherProcs_;
451  *i += *im1;
452  im1 = i;
453  }
454  // starts[i] now contains the number of exports to procs 0 through i
455 
456  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
457  i=starts.rbegin()+1;
458  i != starts.rend(); ++i)
459  {
460  *ip1 = *i;
461  ip1 = i;
462  }
463  starts[0] = 0;
464  // starts[i] now contains the number of exports to procs 0 through
465  // i-1, i.e., all procs before proc i
466 
467  indicesTo_.resize(numActive);
468 
469  for (size_t i = 0; i < numExportIDs; ++i) {
470  if (exportProcIDs[i] >= 0) {
471  // record the offset to the sendBuffer for this export
472  indicesTo_[starts[exportProcIDs[i]]] = i;
473  // now increment the offset for this proc
474  ++starts[exportProcIDs[i]];
475  }
476  }
477  for (int proc = numProcs-1; proc != 0; --proc) {
478  starts[proc] = starts[proc-1];
479  }
480  starts.front() = 0;
481  starts[numProcs] = numActive;
482  procIdsToSendTo_.resize(numSendsToOtherProcs_);
483  startsTo_.resize(numSendsToOtherProcs_);
484  lengthsTo_.resize(numSendsToOtherProcs_);
485  maxSendLength_ = 0;
486  size_t snd = 0;
487  for (int proc = 0; proc < numProcs; ++proc ) {
488  if (starts[proc+1] != starts[proc]) {
489  lengthsTo_[snd] = starts[proc+1] - starts[proc];
490  startsTo_[snd] = starts[proc];
491  // record max length for all off-proc sends
492  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
493  maxSendLength_ = lengthsTo_[snd];
494  }
495  procIdsToSendTo_[snd] = proc;
496  ++snd;
497  }
498  }
499  }
500  else {
501  // grouped by proc, no send buffer or indicesTo_ needed
502  numSendsToOtherProcs_ = 0;
503  // Count total number of sends, i.e., total number of procs to
504  // which we are sending. This includes myself, if applicable.
505  for (int i = 0; i < numProcs; ++i) {
506  if (starts[i]) {
507  ++numSendsToOtherProcs_;
508  }
509  }
510 
511  // Not only do we not need these, but we must clear them, as
512  // empty status of indicesTo is a flag used later.
513  indicesTo_.resize(0);
514  // Size these to numSendsToOtherProcs_; note, at the moment, numSendsToOtherProcs_
515  // includes self sends. Set their values to zeros.
516  procIdsToSendTo_.assign(numSendsToOtherProcs_,0);
517  startsTo_.assign(numSendsToOtherProcs_,0);
518  lengthsTo_.assign(numSendsToOtherProcs_,0);
519 
520  // set startsTo to the offset for each send (i.e., each proc ID)
521  // set procsTo to the proc ID for each send
522  // in interpreting this code, remember that we are assuming contiguity
523  // that is why index skips through the ranks
524  {
525  size_t index = 0, procIndex = 0;
526  for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
527  while (exportProcIDs[procIndex] < 0) {
528  ++procIndex; // skip all negative proc IDs
529  }
530  startsTo_[i] = procIndex;
531  int procID = exportProcIDs[procIndex];
532  procIdsToSendTo_[i] = procID;
533  index += starts[procID];
534  procIndex += starts[procID];
535  }
536  }
537  // sort the startsTo and proc IDs together, in ascending order, according
538  // to proc IDs
539  if (numSendsToOtherProcs_ > 0) {
540  sort2(procIdsToSendTo_.begin(), procIdsToSendTo_.end(), startsTo_.begin());
541  }
542  // compute the maximum send length
543  maxSendLength_ = 0;
544  for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
545  int procID = procIdsToSendTo_[i];
546  lengthsTo_[i] = starts[procID];
547  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
548  maxSendLength_ = lengthsTo_[i];
549  }
550  }
551  }
552 
553 
554  numSendsToOtherProcs_ -= sendMessageToSelf_;
555  std::vector<int> recv_list;
556  recv_list.reserve(numSendsToOtherProcs_); //reserve an initial guess for size needed
557 
558  int last_pid=-2;
559  for(int i=0; i<remoteProcIDs.size(); i++) {
560  if(remoteProcIDs[i]>last_pid) {
561  recv_list.push_back(remoteProcIDs[i]);
562  last_pid = remoteProcIDs[i];
563  }
564  else if (remoteProcIDs[i]<last_pid)
565  throw std::runtime_error("Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
566  }
567  numReceives_ = recv_list.size();
568  if(numReceives_) {
569  procsFrom_.assign(numReceives_,0);
570  lengthsFrom_.assign(numReceives_,0);
571  indicesFrom_.assign(numReceives_,0);
572  startsFrom_.assign(numReceives_,0);
573  }
574  for(size_t i=0,j=0; i<numReceives_; ++i) {
575  int jlast=j;
576  procsFrom_[i] = recv_list[i];
577  startsFrom_[i] = j;
578  for( ; j<(size_t)remoteProcIDs.size() &&
579  remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
580  lengthsFrom_[i] = j-jlast;
581  }
582  totalReceiveLength_ = remoteProcIDs.size();
583  indicesFrom_.clear ();
584  numReceives_-=sendMessageToSelf_;
585 }
586 
587 Teuchos::RCP<DistributorPlan> DistributorPlan::getReversePlan() const {
588  if (reversePlan_.is_null()) createReversePlan();
589  return reversePlan_;
590 }
591 
592 void DistributorPlan::createReversePlan() const
593 {
594  reversePlan_ = Teuchos::rcp(new DistributorPlan(comm_));
595  reversePlan_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
596  reversePlan_->sendType_ = sendType_;
597  reversePlan_->barrierBetweenRecvSend_ = barrierBetweenRecvSend_;
598 
599  // The total length of all the sends of this DistributorPlan. We
600  // calculate it because it's the total length of all the receives
601  // of the reverse DistributorPlan.
602  size_t totalSendLength =
603  std::accumulate(lengthsTo_.begin(), lengthsTo_.end(), 0);
604 
605  // The maximum length of any of the receives of this DistributorPlan.
606  // We calculate it because it's the maximum length of any of the
607  // sends of the reverse DistributorPlan.
608  size_t maxReceiveLength = 0;
609  const int myProcID = comm_->getRank();
610  for (size_t i=0; i < numReceives_; ++i) {
611  if (procsFrom_[i] != myProcID) {
612  // Don't count receives for messages sent by myself to myself.
613  if (lengthsFrom_[i] > maxReceiveLength) {
614  maxReceiveLength = lengthsFrom_[i];
615  }
616  }
617  }
618 
619  reversePlan_->sendMessageToSelf_ = sendMessageToSelf_;
620  reversePlan_->numSendsToOtherProcs_ = numReceives_;
621  reversePlan_->procIdsToSendTo_ = procsFrom_;
622  reversePlan_->startsTo_ = startsFrom_;
623  reversePlan_->lengthsTo_ = lengthsFrom_;
624  reversePlan_->maxSendLength_ = maxReceiveLength;
625  reversePlan_->indicesTo_ = indicesFrom_;
626  reversePlan_->numReceives_ = numSendsToOtherProcs_;
627  reversePlan_->totalReceiveLength_ = totalSendLength;
628  reversePlan_->lengthsFrom_ = lengthsTo_;
629  reversePlan_->procsFrom_ = procIdsToSendTo_;
630  reversePlan_->startsFrom_ = startsTo_;
631  reversePlan_->indicesFrom_ = indicesTo_;
632  reversePlan_->useDistinctTags_ = useDistinctTags_;
633 }
634 
635 void DistributorPlan::computeReceives()
636 {
637  using Teuchos::Array;
638  using Teuchos::ArrayRCP;
639  using Teuchos::as;
640  using Teuchos::CommStatus;
641  using Teuchos::CommRequest;
642  using Teuchos::ireceive;
643  using Teuchos::RCP;
644  using Teuchos::rcp;
645  using Teuchos::REDUCE_SUM;
646  using Teuchos::receive;
647  using Teuchos::reduce;
648  using Teuchos::scatter;
649  using Teuchos::send;
650  using Teuchos::waitAll;
651 
652  const int myRank = comm_->getRank();
653  const int numProcs = comm_->getSize();
654 
655  // MPI tag for nonblocking receives and blocking sends in this method.
656  const int pathTag = 2;
657  const int tag = getTag(pathTag);
658 
659  // toProcsFromMe[i] == the number of messages sent by this process
660  // to process i. The data in numSendsToOtherProcs_, procIdsToSendTo_, and lengthsTo_
661  // concern the contiguous sends. Therefore, each process will be
662  // listed in procIdsToSendTo_ at most once, and so toProcsFromMe[i] will
663  // either be 0 or 1.
664  {
665  Array<int> toProcsFromMe (numProcs, 0);
666 #ifdef HAVE_TEUCHOS_DEBUG
667  bool counting_error = false;
668 #endif // HAVE_TEUCHOS_DEBUG
669  for (size_t i = 0; i < (numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0)); ++i) {
670 #ifdef HAVE_TEUCHOS_DEBUG
671  if (toProcsFromMe[procIdsToSendTo_[i]] != 0) {
672  counting_error = true;
673  }
674 #endif // HAVE_TEUCHOS_DEBUG
675  toProcsFromMe[procIdsToSendTo_[i]] = 1;
676  }
677 #ifdef HAVE_TEUCHOS_DEBUG
678  SHARED_TEST_FOR_EXCEPTION(counting_error, std::logic_error,
679  "Tpetra::Distributor::computeReceives: There was an error on at least "
680  "one process in counting the number of messages send by that process to "
681  "the other processs. Please report this bug to the Tpetra developers.",
682  *comm_);
683 #endif // HAVE_TEUCHOS_DEBUG
684 
685  // Compute the number of receives that this process needs to
686  // post. The number of receives includes any self sends (i.e.,
687  // messages sent by this process to itself).
688  //
689  // (We will use numReceives_ this below to post exactly that
690  // number of receives, with MPI_ANY_SOURCE as the sending rank.
691  // This will tell us from which processes this process expects
692  // to receive, and how many packets of data we expect to receive
693  // from each process.)
694  //
695  // toProcsFromMe[i] is the number of messages sent by this
696  // process to process i. Compute the sum (elementwise) of all
697  // the toProcsFromMe arrays on all processes in the
698  // communicator. If the array x is that sum, then if this
699  // process has rank j, x[j] is the number of messages sent
700  // to process j, that is, the number of receives on process j
701  // (including any messages sent by process j to itself).
702  //
703  // Yes, this requires storing and operating on an array of
704  // length P, where P is the number of processes in the
705  // communicator. Epetra does this too. Avoiding this O(P)
706  // memory bottleneck would require some research.
707  //
708  // mfh 09 Jan 2012, 15 Jul 2015: There are three ways to
709  // implement this O(P) memory algorithm.
710  //
711  // 1. Use MPI_Reduce and MPI_Scatter: reduce on the root
712  // process (0) from toProcsFromMe, to numRecvsOnEachProc.
713  // Then, scatter the latter, so that each process p gets
714  // numRecvsOnEachProc[p].
715  //
716  // 2. Like #1, but use MPI_Reduce_scatter instead of
717  // MPI_Reduce and MPI_Scatter. MPI_Reduce_scatter might be
718  // optimized to reduce the number of messages, but
719  // MPI_Reduce_scatter is more general than we need (it
720  // allows the equivalent of MPI_Scatterv). See Bug 6336.
721  //
722  // 3. Do an all-reduce on toProcsFromMe, and let my process
723  // (with rank myRank) get numReceives_ from
724  // toProcsFromMe[myRank]. The HPCCG miniapp uses the
725  // all-reduce method.
726  //
727  // Approaches 1 and 3 have the same critical path length.
728  // However, #3 moves more data. This is because the final
729  // result is just one integer, but #3 moves a whole array of
730  // results to all the processes. This is why we use Approach 1
731  // here.
732  //
733  // mfh 12 Apr 2013: See discussion in createFromSends() about
734  // how we could use this communication to propagate an error
735  // flag for "free" in a release build.
736 
737  const int root = 0; // rank of root process of the reduction
738  Array<int> numRecvsOnEachProc; // temp; only needed on root
739  if (myRank == root) {
740  numRecvsOnEachProc.resize (numProcs);
741  }
742  int numReceivesAsInt = 0; // output
743  reduce<int, int> (toProcsFromMe.getRawPtr (),
744  numRecvsOnEachProc.getRawPtr (),
745  numProcs, REDUCE_SUM, root, *comm_);
746  scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
747  &numReceivesAsInt, 1, root, *comm_);
748  numReceives_ = static_cast<size_t> (numReceivesAsInt);
749  }
750 
751  // Now we know numReceives_, which is this process' number of
752  // receives. Allocate the lengthsFrom_ and procsFrom_ arrays
753  // with this number of entries.
754  lengthsFrom_.assign (numReceives_, 0);
755  procsFrom_.assign (numReceives_, 0);
756 
757  //
758  // Ask (via nonblocking receive) each process from which we are
759  // receiving how many packets we should expect from it in the
760  // communication pattern.
761  //
762 
763  // At this point, numReceives_ includes any self message that
764  // there may be. At the end of this routine, we'll subtract off
765  // the self message (if there is one) from numReceives_. In this
766  // routine, we don't need to receive a message from ourselves in
767  // order to figure out our lengthsFrom_ and source process ID; we
768  // can just ask ourselves directly. Thus, the actual number of
769  // nonblocking receives we post here does not include the self
770  // message.
771  const size_t actualNumReceives = numReceives_ - (sendMessageToSelf_ ? 1 : 0);
772 
773  // Teuchos' wrapper for nonblocking receives requires receive
774  // buffers that it knows won't go away. This is why we use RCPs,
775  // one RCP per nonblocking receive request. They get allocated in
776  // the loop below.
777  Array<RCP<CommRequest<int> > > requests (actualNumReceives);
778  Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
779  Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
780 
781  // Teuchos::Comm treats a negative process ID as MPI_ANY_SOURCE
782  // (receive data from any process).
783 #ifdef HAVE_MPI
784  const int anySourceProc = MPI_ANY_SOURCE;
785 #else
786  const int anySourceProc = -1;
787 #endif
788 
789  // Post the (nonblocking) receives.
790  for (size_t i = 0; i < actualNumReceives; ++i) {
791  // Once the receive completes, we can ask the corresponding
792  // CommStatus object (output by wait()) for the sending process'
793  // ID (which we'll assign to procsFrom_[i] -- don't forget to
794  // do that!).
795  lengthsFromBuffers[i].resize (1);
796  lengthsFromBuffers[i][0] = as<size_t> (0);
797  requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc,
798  tag, *comm_);
799  }
800 
801  // Post the sends: Tell each process to which we are sending how
802  // many packets it should expect from us in the communication
803  // pattern. We could use nonblocking sends here, as long as we do
804  // a waitAll() on all the sends and receives at once.
805  //
806  // We assume that numSendsToOtherProcs_ and sendMessageToSelf_ have already been
807  // set. The value of numSendsToOtherProcs_ (my process' number of sends) does
808  // not include any message that it might send to itself.
809  for (size_t i = 0; i < numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0); ++i) {
810  if (procIdsToSendTo_[i] != myRank) {
811  // Send a message to procIdsToSendTo_[i], telling that process that
812  // this communication pattern will send that process
813  // lengthsTo_[i] blocks of packets.
814  const size_t* const lengthsTo_i = &lengthsTo_[i];
815  send<int, size_t> (lengthsTo_i, 1, as<int> (procIdsToSendTo_[i]), tag, *comm_);
816  }
817  else {
818  // We don't need a send in the self-message case. If this
819  // process will send a message to itself in the communication
820  // pattern, then the last element of lengthsFrom_ and
821  // procsFrom_ corresponds to the self-message. Of course
822  // this process knows how long the message is, and the process
823  // ID is its own process ID.
824  lengthsFrom_[numReceives_-1] = lengthsTo_[i];
825  procsFrom_[numReceives_-1] = myRank;
826  }
827  }
828 
829  //
830  // Wait on all the receives. When they arrive, check the status
831  // output of wait() for the receiving process ID, unpack the
832  // request buffers into lengthsFrom_, and set procsFrom_ from the
833  // status.
834  //
835  waitAll (*comm_, requests (), statuses ());
836  for (size_t i = 0; i < actualNumReceives; ++i) {
837  lengthsFrom_[i] = *lengthsFromBuffers[i];
838  procsFrom_[i] = statuses[i]->getSourceRank ();
839  }
840 
841  // Sort the procsFrom_ array, and apply the same permutation to
842  // lengthsFrom_. This ensures that procsFrom_[i] and
843  // lengthsFrom_[i] refers to the same thing.
844  sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
845 
846  // Compute indicesFrom_
847  totalReceiveLength_ =
848  std::accumulate (lengthsFrom_.begin (), lengthsFrom_.end (), 0);
849  indicesFrom_.clear ();
850 
851  startsFrom_.clear ();
852  startsFrom_.reserve (numReceives_);
853  for (size_t i = 0, j = 0; i < numReceives_; ++i) {
854  startsFrom_.push_back(j);
855  j += lengthsFrom_[i];
856  }
857 
858  if (sendMessageToSelf_) {
859  --numReceives_;
860  }
861 }
862 
863 void DistributorPlan::setParameterList(const Teuchos::RCP<Teuchos::ParameterList>& plist)
864 {
865  using Teuchos::FancyOStream;
866  using Teuchos::getIntegralValue;
867  using Teuchos::ParameterList;
868  using Teuchos::parameterList;
869  using Teuchos::RCP;
870  using std::endl;
871 
872  if (! plist.is_null()) {
873  RCP<const ParameterList> validParams = getValidParameters ();
874  plist->validateParametersAndSetDefaults (*validParams);
875 
876  const bool barrierBetween =
877  plist->get<bool> ("Barrier between receives and sends");
878  const Details::EDistributorSendType sendType =
879  getIntegralValue<Details::EDistributorSendType> (*plist, "Send type");
880  const bool useDistinctTags = plist->get<bool> ("Use distinct tags");
881  {
882  // mfh 03 May 2016: We keep this option only for backwards
883  // compatibility, but it must always be true. See discussion of
884  // Github Issue #227.
885  const bool enable_cuda_rdma =
886  plist->get<bool> ("Enable MPI CUDA RDMA support");
887  TEUCHOS_TEST_FOR_EXCEPTION
888  (! enable_cuda_rdma, std::invalid_argument, "Tpetra::Distributor::"
889  "setParameterList: " << "You specified \"Enable MPI CUDA RDMA "
890  "support\" = false. This is no longer valid. You don't need to "
891  "specify this option any more; Tpetra assumes it is always true. "
892  "This is a very light assumption on the MPI implementation, and in "
893  "fact does not actually involve hardware or system RDMA support. "
894  "Tpetra just assumes that the MPI implementation can tell whether a "
895  "pointer points to host memory or CUDA device memory.");
896  }
897 
898  // We check this property explicitly, since we haven't yet learned
899  // how to make a validator that can cross-check properties.
900  // Later, turn this into a validator so that it can be embedded in
901  // the valid ParameterList and used in Optika.
902  TEUCHOS_TEST_FOR_EXCEPTION
903  (! barrierBetween && sendType == Details::DISTRIBUTOR_RSEND,
904  std::invalid_argument, "Tpetra::Distributor::setParameterList: " << endl
905  << "You specified \"Send type\"=\"Rsend\", but turned off the barrier "
906  "between receives and sends." << endl << "This is invalid; you must "
907  "include the barrier if you use ready sends." << endl << "Ready sends "
908  "require that their corresponding receives have already been posted, "
909  "and the only way to guarantee that in general is with a barrier.");
910 
911  // Now that we've validated the input list, save the results.
912  sendType_ = sendType;
913  barrierBetweenRecvSend_ = barrierBetween;
914  useDistinctTags_ = useDistinctTags;
915 
916  // ParameterListAcceptor semantics require pointer identity of the
917  // sublist passed to setParameterList(), so we save the pointer.
918  this->setMyParamList (plist);
919  }
920 }
921 
922 Teuchos::Array<std::string> distributorSendTypes()
923 {
924  Teuchos::Array<std::string> sendTypes;
925  sendTypes.push_back ("Isend");
926  sendTypes.push_back ("Rsend");
927  sendTypes.push_back ("Send");
928  sendTypes.push_back ("Ssend");
929  return sendTypes;
930 }
931 
932 Teuchos::RCP<const Teuchos::ParameterList>
933 DistributorPlan::getValidParameters() const
934 {
935  using Teuchos::Array;
936  using Teuchos::ParameterList;
937  using Teuchos::parameterList;
938  using Teuchos::RCP;
939  using Teuchos::setStringToIntegralParameter;
940 
941  const bool barrierBetween = Details::barrierBetween_default;
942  const bool useDistinctTags = Details::useDistinctTags_default;
943 
944  Array<std::string> sendTypes = distributorSendTypes ();
945  const std::string defaultSendType ("Send");
946  Array<Details::EDistributorSendType> sendTypeEnums;
947  sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
948  sendTypeEnums.push_back (Details::DISTRIBUTOR_RSEND);
949  sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
950  sendTypeEnums.push_back (Details::DISTRIBUTOR_SSEND);
951 
952  RCP<ParameterList> plist = parameterList ("Tpetra::Distributor");
953  plist->set ("Barrier between receives and sends", barrierBetween,
954  "Whether to execute a barrier between receives and sends in do"
955  "[Reverse]Posts(). Required for correctness when \"Send type\""
956  "=\"Rsend\", otherwise correct but not recommended.");
957  setStringToIntegralParameter<Details::EDistributorSendType> ("Send type",
958  defaultSendType, "When using MPI, the variant of send to use in "
959  "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
960  plist->set ("Use distinct tags", useDistinctTags, "Whether to use distinct "
961  "MPI message tags for different code paths. Highly recommended"
962  " to avoid message collisions.");
963  plist->set ("Timer Label","","Label for Time Monitor output");
964  plist->set ("Enable MPI CUDA RDMA support", true, "Assume that MPI can "
965  "tell whether a pointer points to host memory or CUDA device "
966  "memory. You don't need to specify this option any more; "
967  "Tpetra assumes it is always true. This is a very light "
968  "assumption on the MPI implementation, and in fact does not "
969  "actually involve hardware or system RDMA support.");
970 
971  return Teuchos::rcp_const_cast<const ParameterList> (plist);
972 }
973 
974 }
975 }
Stand-alone utility functions and macros.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, Exception, msg)
Print or throw an efficency warning.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
Implementation details of Tpetra.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
EDistributorSendType
The type of MPI send that Distributor should use.
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
Namespace Tpetra contains the class and methods constituting the Tpetra library.
Teuchos::Array< std::string > distributorSendTypes()
Valid values for Distributor's "Send type" parameter.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2)
Sort the first array, and apply the resulting permutation to the second array.