Tpetra parallel linear algebra  Version of the Day
Tpetra_Distributor.cpp
1 // ***********************************************************************
2 //
3 // Tpetra: Templated Linear Algebra Services Package
4 // Copyright (2008) Sandia Corporation
5 //
6 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
7 // the U.S. Government retains certain rights in this software.
8 //
9 // Redistribution and use in source and binary forms, with or without
10 // modification, are permitted provided that the following conditions are
11 // met:
12 //
13 // 1. Redistributions of source code must retain the above copyright
14 // notice, this list of conditions and the following disclaimer.
15 //
16 // 2. Redistributions in binary form must reproduce the above copyright
17 // notice, this list of conditions and the following disclaimer in the
18 // documentation and/or other materials provided with the distribution.
19 //
20 // 3. Neither the name of the Corporation nor the names of the
21 // contributors may be used to endorse or promote products derived from
22 // this software without specific prior written permission.
23 //
24 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
25 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
27 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
28 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
29 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
30 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
31 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
32 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
33 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
34 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 //
36 // Questions? Contact Michael A. Heroux (maherou@sandia.gov)
37 //
38 // ************************************************************************
39 // @HEADER
40 
41 #include "Tpetra_Distributor.hpp"
42 #include "Tpetra_Details_gathervPrint.hpp"
43 #include "Teuchos_StandardParameterEntryValidators.hpp"
44 #include "Teuchos_VerboseObjectParameterListHelpers.hpp"
45 
46 namespace Tpetra {
47  namespace Details {
48  std::string
50  {
51  if (sendType == DISTRIBUTOR_ISEND) {
52  return "Isend";
53  }
54  else if (sendType == DISTRIBUTOR_RSEND) {
55  return "Rsend";
56  }
57  else if (sendType == DISTRIBUTOR_SEND) {
58  return "Send";
59  }
60  else if (sendType == DISTRIBUTOR_SSEND) {
61  return "Ssend";
62  }
63  else {
64  TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid "
65  "EDistributorSendType enum value " << sendType << ".");
66  }
67  }
68 
69  std::string
71  {
72  switch (how) {
73  case Details::DISTRIBUTOR_NOT_INITIALIZED:
74  return "Not initialized yet";
75  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
76  return "By createFromSends";
77  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
78  return "By createFromRecvs";
79  case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
80  return "By createReverseDistributor";
81  case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
82  return "By copy constructor";
83  default:
84  return "INVALID";
85  }
86  }
87  } // namespace Details
88 
89  Teuchos::Array<std::string>
91  {
92  Teuchos::Array<std::string> sendTypes;
93  sendTypes.push_back ("Isend");
94  sendTypes.push_back ("Rsend");
95  sendTypes.push_back ("Send");
96  sendTypes.push_back ("Ssend");
97  return sendTypes;
98  }
99 
100  // We set default values of Distributor's Boolean parameters here,
101  // in this one place. That way, if we want to change the default
102  // value of a parameter, we don't have to search the whole file to
103  // ensure a consistent setting.
104  namespace {
105  // Default value of the "Debug" parameter.
106  const bool tpetraDistributorDebugDefault = false;
107  // Default value of the "Barrier between receives and sends" parameter.
108  const bool barrierBetween_default = false;
109  // Default value of the "Use distinct tags" parameter.
110  const bool useDistinctTags_default = true;
111  } // namespace (anonymous)
112 
113  int Distributor::getTag (const int pathTag) const {
114  return useDistinctTags_ ? pathTag : comm_->getTag ();
115  }
116 
117 
118 #ifdef TPETRA_DISTRIBUTOR_TIMERS
119  void Distributor::makeTimers () {
120  const std::string name_doPosts3 = "Tpetra::Distributor: doPosts(3)";
121  const std::string name_doPosts4 = "Tpetra::Distributor: doPosts(4)";
122  const std::string name_doWaits = "Tpetra::Distributor: doWaits";
123  const std::string name_doPosts3_recvs = "Tpetra::Distributor: doPosts(3): recvs";
124  const std::string name_doPosts4_recvs = "Tpetra::Distributor: doPosts(4): recvs";
125  const std::string name_doPosts3_barrier = "Tpetra::Distributor: doPosts(3): barrier";
126  const std::string name_doPosts4_barrier = "Tpetra::Distributor: doPosts(4): barrier";
127  const std::string name_doPosts3_sends = "Tpetra::Distributor: doPosts(3): sends";
128  const std::string name_doPosts4_sends = "Tpetra::Distributor: doPosts(4): sends";
129 
130  timer_doPosts3_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3);
131  timer_doPosts4_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4);
132  timer_doWaits_ = Teuchos::TimeMonitor::getNewTimer (name_doWaits);
133  timer_doPosts3_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_recvs);
134  timer_doPosts4_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_recvs);
135  timer_doPosts3_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_barrier);
136  timer_doPosts4_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_barrier);
137  timer_doPosts3_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_sends);
138  timer_doPosts4_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_sends);
139  }
140 #endif // TPETRA_DISTRIBUTOR_TIMERS
141 
142  void
143  Distributor::init (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
144  const Teuchos::RCP<Teuchos::FancyOStream>& out,
145  const Teuchos::RCP<Teuchos::ParameterList>& plist)
146  {
147  this->out_ = out.is_null () ?
148  Teuchos::getFancyOStream (Teuchos::rcpFromRef (std::cerr)) : out;
149  if (! plist.is_null ()) {
150  this->setParameterList (plist);
151  }
152 
153 #ifdef TPETRA_DISTRIBUTOR_TIMERS
154  makeTimers ();
155 #endif // TPETRA_DISTRIBUTOR_TIMERS
156 
157  if (debug_) {
158  TEUCHOS_TEST_FOR_EXCEPTION
159  (out_.is_null (), std::logic_error, "Tpetra::Distributor::init: debug_ "
160  "is true but out_ (pointer to the output stream) is NULL. Please "
161  "report this bug to the Tpetra developers.");
162  Teuchos::OSTab tab (out_);
163  std::ostringstream os;
164  os << comm_->getRank ()
165  << ": Distributor ctor done" << std::endl;
166  *out_ << os.str ();
167  }
168  }
169 
170  Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm)
171  : comm_ (comm)
172  , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
173  , sendType_ (Details::DISTRIBUTOR_SEND)
174  , barrierBetween_ (barrierBetween_default)
175  , debug_ (tpetraDistributorDebugDefault)
176  , selfMessage_ (false)
177  , numSends_ (0)
178  , maxSendLength_ (0)
179  , numReceives_ (0)
180  , totalReceiveLength_ (0)
181  , lastRoundBytesSend_ (0)
182  , lastRoundBytesRecv_ (0)
183  , useDistinctTags_ (useDistinctTags_default)
184  {
185  init (comm, Teuchos::null, Teuchos::null);
186  }
187 
188  Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
189  const Teuchos::RCP<Teuchos::FancyOStream>& out)
190  : comm_ (comm)
191  , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
192  , sendType_ (Details::DISTRIBUTOR_SEND)
193  , barrierBetween_ (barrierBetween_default)
194  , debug_ (tpetraDistributorDebugDefault)
195  , selfMessage_ (false)
196  , numSends_ (0)
197  , maxSendLength_ (0)
198  , numReceives_ (0)
199  , totalReceiveLength_ (0)
200  , lastRoundBytesSend_ (0)
201  , lastRoundBytesRecv_ (0)
202  , useDistinctTags_ (useDistinctTags_default)
203  {
204  init (comm, out, Teuchos::null);
205  }
206 
207  Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
208  const Teuchos::RCP<Teuchos::ParameterList>& plist)
209  : comm_ (comm)
210  , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
211  , sendType_ (Details::DISTRIBUTOR_SEND)
212  , barrierBetween_ (barrierBetween_default)
213  , debug_ (tpetraDistributorDebugDefault)
214  , selfMessage_ (false)
215  , numSends_ (0)
216  , maxSendLength_ (0)
217  , numReceives_ (0)
218  , totalReceiveLength_ (0)
219  , lastRoundBytesSend_ (0)
220  , lastRoundBytesRecv_ (0)
221  , useDistinctTags_ (useDistinctTags_default)
222  {
223  init (comm, Teuchos::null, plist);
224  }
225 
226  Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
227  const Teuchos::RCP<Teuchos::FancyOStream>& out,
228  const Teuchos::RCP<Teuchos::ParameterList>& plist)
229  : comm_ (comm)
230  , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
231  , sendType_ (Details::DISTRIBUTOR_SEND)
232  , barrierBetween_ (barrierBetween_default)
233  , debug_ (tpetraDistributorDebugDefault)
234  , selfMessage_ (false)
235  , numSends_ (0)
236  , maxSendLength_ (0)
237  , numReceives_ (0)
238  , totalReceiveLength_ (0)
239  , lastRoundBytesSend_ (0)
240  , lastRoundBytesRecv_ (0)
241  , useDistinctTags_ (useDistinctTags_default)
242  {
243  init (comm, out, plist);
244  }
245 
246  Distributor::Distributor (const Distributor & distributor)
247  : comm_ (distributor.comm_)
248  , out_ (distributor.out_)
249  , howInitialized_ (Details::DISTRIBUTOR_INITIALIZED_BY_COPY)
250  , sendType_ (distributor.sendType_)
251  , barrierBetween_ (distributor.barrierBetween_)
252  , debug_ (distributor.debug_)
253  , selfMessage_ (distributor.selfMessage_)
254  , numSends_ (distributor.numSends_)
255  , procsTo_ (distributor.procsTo_)
256  , startsTo_ (distributor.startsTo_)
257  , lengthsTo_ (distributor.lengthsTo_)
258  , maxSendLength_ (distributor.maxSendLength_)
259  , indicesTo_ (distributor.indicesTo_)
260  , numReceives_ (distributor.numReceives_)
261  , totalReceiveLength_ (distributor.totalReceiveLength_)
262  , lengthsFrom_ (distributor.lengthsFrom_)
263  , procsFrom_ (distributor.procsFrom_)
264  , startsFrom_ (distributor.startsFrom_)
265  , indicesFrom_ (distributor.indicesFrom_)
266  , reverseDistributor_ (distributor.reverseDistributor_)
267  , lastRoundBytesSend_ (distributor.lastRoundBytesSend_)
268  , lastRoundBytesRecv_ (distributor.lastRoundBytesRecv_)
269  , useDistinctTags_ (distributor.useDistinctTags_)
270  {
271  using Teuchos::ParameterList;
272  using Teuchos::parameterList;
273  using Teuchos::RCP;
274  using Teuchos::rcp;
275 
276  // Clone the right-hand side's ParameterList, so that this' list
277  // is decoupled from the right-hand side's list. We don't need to
278  // do validation, since the right-hand side already has validated
279  // its parameters, so just call setMyParamList(). Note that this
280  // won't work if the right-hand side doesn't have a list set yet,
281  // so we first check for null.
282  RCP<const ParameterList> rhsList = distributor.getParameterList ();
283  if (! rhsList.is_null ()) {
284  this->setMyParamList (parameterList (* rhsList));
285  }
286 
287 #ifdef TPETRA_DISTRIBUTOR_TIMERS
288  makeTimers ();
289 #endif // TPETRA_DISTRIBUTOR_TIMERS
290 
291  if (debug_) {
292  TEUCHOS_TEST_FOR_EXCEPTION
293  (out_.is_null (), std::logic_error, "Tpetra::Distributor::init: debug_ "
294  "is true but out_ (pointer to the output stream) is NULL. Please "
295  "report this bug to the Tpetra developers.");
296  Teuchos::OSTab tab (out_);
297  std::ostringstream os;
298  os << comm_->getRank ()
299  << ": Distributor copy ctor done" << std::endl;
300  *out_ << os.str ();
301  }
302  }
303 
305  using Teuchos::ParameterList;
306  using Teuchos::parameterList;
307  using Teuchos::RCP;
308 
309  std::swap (comm_, rhs.comm_);
310  std::swap (out_, rhs.out_);
311  std::swap (howInitialized_, rhs.howInitialized_);
312  std::swap (sendType_, rhs.sendType_);
313  std::swap (barrierBetween_, rhs.barrierBetween_);
314  std::swap (debug_, rhs.debug_);
315  std::swap (selfMessage_, rhs.selfMessage_);
316  std::swap (numSends_, rhs.numSends_);
317  std::swap (procsTo_, rhs.procsTo_);
318  std::swap (startsTo_, rhs.startsTo_);
319  std::swap (lengthsTo_, rhs.lengthsTo_);
320  std::swap (maxSendLength_, rhs.maxSendLength_);
321  std::swap (indicesTo_, rhs.indicesTo_);
322  std::swap (numReceives_, rhs.numReceives_);
323  std::swap (totalReceiveLength_, rhs.totalReceiveLength_);
324  std::swap (lengthsFrom_, rhs.lengthsFrom_);
325  std::swap (procsFrom_, rhs.procsFrom_);
326  std::swap (startsFrom_, rhs.startsFrom_);
327  std::swap (indicesFrom_, rhs.indicesFrom_);
328  std::swap (reverseDistributor_, rhs.reverseDistributor_);
329  std::swap (lastRoundBytesSend_, rhs.lastRoundBytesSend_);
330  std::swap (lastRoundBytesRecv_, rhs.lastRoundBytesRecv_);
331  std::swap (useDistinctTags_, rhs.useDistinctTags_);
332 
333  // Swap parameter lists. If they are the same object, make a deep
334  // copy first, so that modifying one won't modify the other one.
335  RCP<ParameterList> lhsList = this->getNonconstParameterList ();
336  RCP<ParameterList> rhsList = rhs.getNonconstParameterList ();
337  if (lhsList.getRawPtr () == rhsList.getRawPtr () && ! rhsList.is_null ()) {
338  rhsList = parameterList (*rhsList);
339  }
340  if (! rhsList.is_null ()) {
341  this->setMyParamList (rhsList);
342  }
343  if (! lhsList.is_null ()) {
344  rhs.setMyParamList (lhsList);
345  }
346 
347  // We don't need to swap timers, because all instances of
348  // Distributor use the same timers.
349  }
350 
352  {
353  // mfh 10 May 2017: We shouldn't have any outstanding
354  // communication requests at this point. It would be legitimate
355  // to check here, and report an error if requests_.size() != 0.
356  // However, throwing in a destructor is bad form. See #1303:
357  //
358  // https://github.com/trilinos/Trilinos/issues/1303
359  //
360  // If someone wants to restore the error check, please don't
361  // throw; instead, use MPI_Abort (or exit() in a non-MPI build).
362 
363  // TEUCHOS_TEST_FOR_EXCEPTION(requests_.size() != 0, std::runtime_error,
364  // "Tpetra::Distributor: Destructor called with " << requests_.size()
365  // << " outstanding posts (unfulfilled communication requests). There "
366  // "should be none at this point. Please report this bug to the Tpetra "
367  // "developers.");
368  }
369 
370  void
371  Distributor::setParameterList (const Teuchos::RCP<Teuchos::ParameterList>& plist)
372  {
373  using Teuchos::FancyOStream;
374  using Teuchos::getIntegralValue;
375  using Teuchos::includesVerbLevel;
376  using Teuchos::OSTab;
377  using Teuchos::ParameterList;
378  using Teuchos::parameterList;
379  using Teuchos::RCP;
380  using std::endl;
381 
382  RCP<const ParameterList> validParams = getValidParameters ();
383  plist->validateParametersAndSetDefaults (*validParams);
384 
385  const bool barrierBetween =
386  plist->get<bool> ("Barrier between receives and sends");
387  const Details::EDistributorSendType sendType =
388  getIntegralValue<Details::EDistributorSendType> (*plist, "Send type");
389  const bool useDistinctTags = plist->get<bool> ("Use distinct tags");
390  const bool debug = plist->get<bool> ("Debug");
391  {
392  // mfh 03 May 2016: We keep this option only for backwards
393  // compatibility, but it must always be true. See discussion of
394  // Github Issue #227.
395  const bool enable_cuda_rdma =
396  plist->get<bool> ("Enable MPI CUDA RDMA support");
397  TEUCHOS_TEST_FOR_EXCEPTION
398  (! enable_cuda_rdma, std::invalid_argument, "Tpetra::Distributor::"
399  "setParameterList: " << "You specified \"Enable MPI CUDA RDMA "
400  "support\" = false. This is no longer valid. You don't need to "
401  "specify this option any more; Tpetra assumes it is always true. "
402  "This is a very light assumption on the MPI implementation, and in "
403  "fact does not actually involve hardware or system RDMA support. "
404  "Tpetra just assumes that the MPI implementation can tell whether a "
405  "pointer points to host memory or CUDA device memory.");
406  }
407 
408  // We check this property explicitly, since we haven't yet learned
409  // how to make a validator that can cross-check properties.
410  // Later, turn this into a validator so that it can be embedded in
411  // the valid ParameterList and used in Optika.
412  TEUCHOS_TEST_FOR_EXCEPTION(
413  ! barrierBetween && sendType == Details::DISTRIBUTOR_RSEND,
414  std::invalid_argument, "Tpetra::Distributor::setParameterList: " << endl
415  << "You specified \"Send type\"=\"Rsend\", but turned off the barrier "
416  "between receives and sends." << endl << "This is invalid; you must "
417  "include the barrier if you use ready sends." << endl << "Ready sends "
418  "require that their corresponding receives have already been posted, "
419  "and the only way to guarantee that in general is with a barrier.");
420 
421  // Now that we've validated the input list, save the results.
422  sendType_ = sendType;
423  barrierBetween_ = barrierBetween;
424  useDistinctTags_ = useDistinctTags;
425  debug_ = debug;
426 
427  // ParameterListAcceptor semantics require pointer identity of the
428  // sublist passed to setParameterList(), so we save the pointer.
429  this->setMyParamList (plist);
430  }
431 
432  Teuchos::RCP<const Teuchos::ParameterList>
434  {
435  using Teuchos::Array;
436  using Teuchos::ParameterList;
437  using Teuchos::parameterList;
438  using Teuchos::RCP;
439  using Teuchos::setStringToIntegralParameter;
440 
441  const bool barrierBetween = barrierBetween_default;
442  const bool useDistinctTags = useDistinctTags_default;
443  const bool debug = tpetraDistributorDebugDefault;
444 
445  Array<std::string> sendTypes = distributorSendTypes ();
446  const std::string defaultSendType ("Send");
447  Array<Details::EDistributorSendType> sendTypeEnums;
448  sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
449  sendTypeEnums.push_back (Details::DISTRIBUTOR_RSEND);
450  sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
451  sendTypeEnums.push_back (Details::DISTRIBUTOR_SSEND);
452 
453  RCP<ParameterList> plist = parameterList ("Tpetra::Distributor");
454  plist->set ("Barrier between receives and sends", barrierBetween,
455  "Whether to execute a barrier between receives and sends in do"
456  "[Reverse]Posts(). Required for correctness when \"Send type\""
457  "=\"Rsend\", otherwise correct but not recommended.");
458  setStringToIntegralParameter<Details::EDistributorSendType> ("Send type",
459  defaultSendType, "When using MPI, the variant of send to use in "
460  "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
461  plist->set ("Use distinct tags", useDistinctTags, "Whether to use distinct "
462  "MPI message tags for different code paths. Highly recommended"
463  " to avoid message collisions.");
464  plist->set ("Debug", debug, "Whether to print copious debugging output on "
465  "all processes.");
466  plist->set ("Enable MPI CUDA RDMA support", true, "Assume that MPI can "
467  "tell whether a pointer points to host memory or CUDA device "
468  "memory. You don't need to specify this option any more; "
469  "Tpetra assumes it is always true. This is a very light "
470  "assumption on the MPI implementation, and in fact does not "
471  "actually involve hardware or system RDMA support.");
472 
473  // mfh 24 Dec 2015: Tpetra no longer inherits from
474  // Teuchos::VerboseObject, so it doesn't need the "VerboseObject"
475  // sublist. However, we retain the "VerboseObject" sublist
476  // anyway, for backwards compatibility (otherwise the above
477  // validation would fail with an invalid parameter name, should
478  // the user still want to provide this list).
479  Teuchos::setupVerboseObjectSublist (&*plist);
480  return Teuchos::rcp_const_cast<const ParameterList> (plist);
481  }
482 
483 
485  { return totalReceiveLength_; }
486 
488  { return numReceives_; }
489 
491  { return selfMessage_; }
492 
494  { return numSends_; }
495 
497  { return maxSendLength_; }
498 
499  Teuchos::ArrayView<const int> Distributor::getProcsFrom() const
500  { return procsFrom_; }
501 
502  Teuchos::ArrayView<const size_t> Distributor::getLengthsFrom() const
503  { return lengthsFrom_; }
504 
505  Teuchos::ArrayView<const int> Distributor::getProcsTo() const
506  { return procsTo_; }
507 
508  Teuchos::ArrayView<const size_t> Distributor::getLengthsTo() const
509  { return lengthsTo_; }
510 
511  Teuchos::RCP<Distributor>
513  if (reverseDistributor_.is_null ()) {
514  createReverseDistributor ();
515  }
516  TEUCHOS_TEST_FOR_EXCEPTION
517  (reverseDistributor_.is_null (), std::logic_error, "The reverse "
518  "Distributor is null after createReverseDistributor returned. "
519  "Please report this bug to the Tpetra developers.");
520  return reverseDistributor_;
521  }
522 
523 
524  void
525  Distributor::createReverseDistributor() const
526  {
527  reverseDistributor_ = Teuchos::rcp (new Distributor (comm_, out_));
528  reverseDistributor_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
529  reverseDistributor_->sendType_ = sendType_;
530  reverseDistributor_->barrierBetween_ = barrierBetween_;
531  reverseDistributor_->debug_ = debug_;
532 
533  // The total length of all the sends of this Distributor. We
534  // calculate it because it's the total length of all the receives
535  // of the reverse Distributor.
536  size_t totalSendLength =
537  std::accumulate (lengthsTo_.begin(), lengthsTo_.end(), 0);
538 
539  // The maximum length of any of the receives of this Distributor.
540  // We calculate it because it's the maximum length of any of the
541  // sends of the reverse Distributor.
542  size_t maxReceiveLength = 0;
543  const int myProcID = comm_->getRank();
544  for (size_t i=0; i < numReceives_; ++i) {
545  if (procsFrom_[i] != myProcID) {
546  // Don't count receives for messages sent by myself to myself.
547  if (lengthsFrom_[i] > maxReceiveLength) {
548  maxReceiveLength = lengthsFrom_[i];
549  }
550  }
551  }
552 
553  // Initialize all of reverseDistributor's data members. This
554  // mainly just involves flipping "send" and "receive," or the
555  // equivalent "to" and "from."
556 
557  reverseDistributor_->selfMessage_ = selfMessage_;
558  reverseDistributor_->numSends_ = numReceives_;
559  reverseDistributor_->procsTo_ = procsFrom_;
560  reverseDistributor_->startsTo_ = startsFrom_;
561  reverseDistributor_->lengthsTo_ = lengthsFrom_;
562  reverseDistributor_->maxSendLength_ = maxReceiveLength;
563  reverseDistributor_->indicesTo_ = indicesFrom_;
564  reverseDistributor_->numReceives_ = numSends_;
565  reverseDistributor_->totalReceiveLength_ = totalSendLength;
566  reverseDistributor_->lengthsFrom_ = lengthsTo_;
567  reverseDistributor_->procsFrom_ = procsTo_;
568  reverseDistributor_->startsFrom_ = startsTo_;
569  reverseDistributor_->indicesFrom_ = indicesTo_;
570 
571  // requests_: Allocated on demand.
572  // reverseDistributor_: See note below
573 
574  // mfh 31 Mar 2016: These are statistics, kept on calls to
575  // doPostsAndWaits or doReversePostsAndWaits. They weren't here
576  // when I started, and I didn't add them, so I don't know if they
577  // are accurate.
578  reverseDistributor_->lastRoundBytesSend_ = 0;
579  reverseDistributor_->lastRoundBytesRecv_ = 0;
580 
581  reverseDistributor_->useDistinctTags_ = useDistinctTags_;
582 
583  // I am my reverse Distributor's reverse Distributor.
584  // Thus, it would be legit to do the following:
585  //
586  // reverseDistributor_->reverseDistributor_ = Teuchos::rcp (this, false);
587  //
588  // (Note use of a "weak reference" to avoid a circular RCP
589  // dependency.) The only issue is that if users hold on to the
590  // reverse Distributor but let go of the forward one, this
591  // reference won't be valid anymore. However, the reverse
592  // Distributor is really an implementation detail of Distributor
593  // and not meant to be used directly, so we don't need to do this.
594  reverseDistributor_->reverseDistributor_ = Teuchos::null;
595  }
596 
597 
599  using Teuchos::Array;
600  using Teuchos::CommRequest;
601  using Teuchos::FancyOStream;
602  using Teuchos::includesVerbLevel;
603  using Teuchos::is_null;
604  using Teuchos::OSTab;
605  using Teuchos::RCP;
606  using Teuchos::waitAll;
607  using std::endl;
608 
609  Teuchos::OSTab tab (out_);
610 
611 #ifdef TPETRA_DISTRIBUTOR_TIMERS
612  Teuchos::TimeMonitor timeMon (*timer_doWaits_);
613 #endif // TPETRA_DISTRIBUTOR_TIMERS
614 
615  const int myRank = comm_->getRank ();
616 
617  if (debug_) {
618  std::ostringstream os;
619  os << myRank << ": doWaits: # reqs = "
620  << requests_.size () << endl;
621  *out_ << os.str ();
622  }
623 
624  if (requests_.size() > 0) {
625  waitAll (*comm_, requests_());
626 
627 #ifdef HAVE_TEUCHOS_DEBUG
628  // Make sure that waitAll() nulled out all the requests.
629  for (Array<RCP<CommRequest<int> > >::const_iterator it = requests_.begin();
630  it != requests_.end(); ++it)
631  {
632  TEUCHOS_TEST_FOR_EXCEPTION( ! is_null (*it), std::runtime_error,
633  Teuchos::typeName(*this) << "::doWaits(): Communication requests "
634  "should all be null aftr calling Teuchos::waitAll() on them, but "
635  "at least one request is not null.");
636  }
637 #endif // HAVE_TEUCHOS_DEBUG
638  // Restore the invariant that requests_.size() is the number of
639  // outstanding nonblocking communication requests.
640  requests_.resize (0);
641  }
642 
643 #ifdef HAVE_TEUCHOS_DEBUG
644  {
645  const int localSizeNonzero = (requests_.size () != 0) ? 1 : 0;
646  int globalSizeNonzero = 0;
647  Teuchos::reduceAll<int, int> (*comm_, Teuchos::REDUCE_MAX,
648  localSizeNonzero,
649  Teuchos::outArg (globalSizeNonzero));
650  TEUCHOS_TEST_FOR_EXCEPTION(
651  globalSizeNonzero != 0, std::runtime_error,
652  "Tpetra::Distributor::doWaits: After waitAll, at least one process has "
653  "a nonzero number of outstanding posts. There should be none at this "
654  "point. Please report this bug to the Tpetra developers.");
655  }
656 #endif // HAVE_TEUCHOS_DEBUG
657 
658  if (debug_) {
659  std::ostringstream os;
660  os << myRank << ": doWaits done" << endl;
661  *out_ << os.str ();
662  }
663  }
664 
666  // call doWaits() on the reverse Distributor, if it exists
667  if (! reverseDistributor_.is_null()) {
668  reverseDistributor_->doWaits();
669  }
670  }
671 
672  std::string Distributor::description () const {
673  std::ostringstream out;
674 
675  out << "\"Tpetra::Distributor\": {";
676  const std::string label = this->getObjectLabel ();
677  if (label != "") {
678  out << "Label: " << label << ", ";
679  }
680  out << "How initialized: "
682  << ", Parameters: {"
683  << "Send type: "
684  << DistributorSendTypeEnumToString (sendType_)
685  << ", Barrier between receives and sends: "
686  << (barrierBetween_ ? "true" : "false")
687  << ", Use distinct tags: "
688  << (useDistinctTags_ ? "true" : "false")
689  << ", Debug: " << (debug_ ? "true" : "false")
690  << "}}";
691  return out.str ();
692  }
693 
694  std::string
695  Distributor::
696  localDescribeToString (const Teuchos::EVerbosityLevel vl) const
697  {
698  using Teuchos::toString;
699  using Teuchos::VERB_HIGH;
700  using Teuchos::VERB_EXTREME;
701  using std::endl;
702 
703  // This preserves current behavior of Distributor.
704  if (vl <= Teuchos::VERB_LOW || comm_.is_null ()) {
705  return std::string ();
706  }
707 
708  auto outStringP = Teuchos::rcp (new std::ostringstream ());
709  auto outp = Teuchos::getFancyOStream (outStringP); // returns RCP
710  Teuchos::FancyOStream& out = *outp;
711 
712  const int myRank = comm_->getRank ();
713  const int numProcs = comm_->getSize ();
714  out << "Process " << myRank << " of " << numProcs << ":" << endl;
715  Teuchos::OSTab tab1 (out);
716 
717  out << "selfMessage: " << hasSelfMessage () << endl;
718  out << "numSends: " << getNumSends () << endl;
719  if (vl == VERB_HIGH || vl == VERB_EXTREME) {
720  out << "procsTo: " << toString (procsTo_) << endl;
721  out << "lengthsTo: " << toString (lengthsTo_) << endl;
722  out << "maxSendLength: " << getMaxSendLength () << endl;
723  }
724  if (vl == VERB_EXTREME) {
725  out << "startsTo: " << toString (startsTo_) << endl;
726  out << "indicesTo: " << toString (indicesTo_) << endl;
727  }
728  if (vl == VERB_HIGH || vl == VERB_EXTREME) {
729  out << "numReceives: " << getNumReceives () << endl;
730  out << "totalReceiveLength: " << getTotalReceiveLength () << endl;
731  out << "lengthsFrom: " << toString (lengthsFrom_) << endl;
732  out << "startsFrom: " << toString (startsFrom_) << endl;
733  out << "procsFrom: " << toString (procsFrom_) << endl;
734  }
735 
736  out.flush (); // make sure the ostringstream got everything
737  return outStringP->str ();
738  }
739 
740  void
742  describe (Teuchos::FancyOStream &out,
743  const Teuchos::EVerbosityLevel verbLevel) const
744  {
745  using std::endl;
746  using Teuchos::VERB_DEFAULT;
747  using Teuchos::VERB_NONE;
748  using Teuchos::VERB_LOW;
749  using Teuchos::VERB_MEDIUM;
750  using Teuchos::VERB_HIGH;
751  using Teuchos::VERB_EXTREME;
752  const Teuchos::EVerbosityLevel vl =
753  (verbLevel == VERB_DEFAULT) ? VERB_LOW : verbLevel;
754 
755  if (vl == VERB_NONE) {
756  return; // don't print anything
757  }
758  // If this Distributor's Comm is null, then the the calling
759  // process does not participate in Distributor-related collective
760  // operations with the other processes. In that case, it is not
761  // even legal to call this method. The reasonable thing to do in
762  // that case is nothing.
763  if (comm_.is_null ()) {
764  return;
765  }
766  const int myRank = comm_->getRank ();
767  const int numProcs = comm_->getSize ();
768 
769  // Only Process 0 should touch the output stream, but this method
770  // in general may need to do communication. Thus, we may need to
771  // preserve the current tab level across multiple "if (myRank ==
772  // 0) { ... }" inner scopes. This is why we sometimes create
773  // OSTab instances by pointer, instead of by value. We only need
774  // to create them by pointer if the tab level must persist through
775  // multiple inner scopes.
776  Teuchos::RCP<Teuchos::OSTab> tab0, tab1;
777 
778  if (myRank == 0) {
779  // At every verbosity level but VERB_NONE, Process 0 prints.
780  // By convention, describe() always begins with a tab before
781  // printing.
782  tab0 = Teuchos::rcp (new Teuchos::OSTab (out));
783  // We quote the class name because it contains colons.
784  // This makes the output valid YAML.
785  out << "\"Tpetra::Distributor\":" << endl;
786  tab1 = Teuchos::rcp (new Teuchos::OSTab (out));
787 
788  const std::string label = this->getObjectLabel ();
789  if (label != "") {
790  out << "Label: " << label << endl;
791  }
792  out << "Number of processes: " << numProcs << endl
793  << "How initialized: "
795  << endl;
796  {
797  out << "Parameters: " << endl;
798  Teuchos::OSTab tab2 (out);
799  out << "\"Send type\": "
800  << DistributorSendTypeEnumToString (sendType_) << endl
801  << "\"Barrier between receives and sends\": "
802  << (barrierBetween_ ? "true" : "false") << endl
803  << "\"Use distinct tags\": "
804  << (useDistinctTags_ ? "true" : "false") << endl
805  << "\"Debug\": " << (debug_ ? "true" : "false") << endl;
806  }
807  } // if myRank == 0
808 
809  // This is collective over the Map's communicator.
810  if (vl > VERB_LOW) {
811  const std::string lclStr = this->localDescribeToString (vl);
812  Tpetra::Details::gathervPrint (out, lclStr, *comm_);
813  }
814 
815  out << "Reverse Distributor:";
816  if (reverseDistributor_.is_null ()) {
817  out << " null" << endl;
818  }
819  else {
820  out << endl;
821  reverseDistributor_->describe (out, vl);
822  }
823  }
824 
825  void
826  Distributor::computeReceives ()
827  {
828  using Teuchos::Array;
829  using Teuchos::ArrayRCP;
830  using Teuchos::as;
831  using Teuchos::CommStatus;
832  using Teuchos::CommRequest;
833  using Teuchos::ireceive;
834  using Teuchos::RCP;
835  using Teuchos::rcp;
836  using Teuchos::REDUCE_SUM;
837  using Teuchos::receive;
838  using Teuchos::reduce;
839  using Teuchos::scatter;
840  using Teuchos::send;
841  using Teuchos::waitAll;
842  using std::endl;
843 
844  Teuchos::OSTab tab (out_);
845  const int myRank = comm_->getRank();
846  const int numProcs = comm_->getSize();
847 
848  // MPI tag for nonblocking receives and blocking sends in this method.
849  const int pathTag = 2;
850  const int tag = this->getTag (pathTag);
851 
852  if (debug_) {
853  std::ostringstream os;
854  os << myRank << ": computeReceives: "
855  "{selfMessage_: " << (selfMessage_ ? "true" : "false")
856  << ", tag: " << tag << "}" << endl;
857  *out_ << os.str ();
858  }
859 
860  // toProcsFromMe[i] == the number of messages sent by this process
861  // to process i. The data in numSends_, procsTo_, and lengthsTo_
862  // concern the contiguous sends. Therefore, each process will be
863  // listed in procsTo_ at most once, and so toProcsFromMe[i] will
864  // either be 0 or 1.
865  {
866  Array<int> toProcsFromMe (numProcs, 0);
867 #ifdef HAVE_TEUCHOS_DEBUG
868  bool counting_error = false;
869 #endif // HAVE_TEUCHOS_DEBUG
870  for (size_t i = 0; i < (numSends_ + (selfMessage_ ? 1 : 0)); ++i) {
871 #ifdef HAVE_TEUCHOS_DEBUG
872  if (toProcsFromMe[procsTo_[i]] != 0) {
873  counting_error = true;
874  }
875 #endif // HAVE_TEUCHOS_DEBUG
876  toProcsFromMe[procsTo_[i]] = 1;
877  }
878 #ifdef HAVE_TEUCHOS_DEBUG
879  SHARED_TEST_FOR_EXCEPTION(counting_error, std::logic_error,
880  "Tpetra::Distributor::computeReceives: There was an error on at least "
881  "one process in counting the number of messages send by that process to "
882  "the other processs. Please report this bug to the Tpetra developers.",
883  *comm_);
884 #endif // HAVE_TEUCHOS_DEBUG
885 
886  if (debug_) {
887  std::ostringstream os;
888  os << myRank << ": computeReceives: Calling reduce and scatter" << endl;
889  *out_ << os.str ();
890  }
891 
892  // Compute the number of receives that this process needs to
893  // post. The number of receives includes any self sends (i.e.,
894  // messages sent by this process to itself).
895  //
896  // (We will use numReceives_ this below to post exactly that
897  // number of receives, with MPI_ANY_SOURCE as the sending rank.
898  // This will tell us from which processes this process expects
899  // to receive, and how many packets of data we expect to receive
900  // from each process.)
901  //
902  // toProcsFromMe[i] is the number of messages sent by this
903  // process to process i. Compute the sum (elementwise) of all
904  // the toProcsFromMe arrays on all processes in the
905  // communicator. If the array x is that sum, then if this
906  // process has rank j, x[j] is the number of messages sent
907  // to process j, that is, the number of receives on process j
908  // (including any messages sent by process j to itself).
909  //
910  // Yes, this requires storing and operating on an array of
911  // length P, where P is the number of processes in the
912  // communicator. Epetra does this too. Avoiding this O(P)
913  // memory bottleneck would require some research.
914  //
915  // mfh 09 Jan 2012, 15 Jul 2015: There are three ways to
916  // implement this O(P) memory algorithm.
917  //
918  // 1. Use MPI_Reduce and MPI_Scatter: reduce on the root
919  // process (0) from toProcsFromMe, to numRecvsOnEachProc.
920  // Then, scatter the latter, so that each process p gets
921  // numRecvsOnEachProc[p].
922  //
923  // 2. Like #1, but use MPI_Reduce_scatter instead of
924  // MPI_Reduce and MPI_Scatter. MPI_Reduce_scatter might be
925  // optimized to reduce the number of messages, but
926  // MPI_Reduce_scatter is more general than we need (it
927  // allows the equivalent of MPI_Scatterv). See Bug 6336.
928  //
929  // 3. Do an all-reduce on toProcsFromMe, and let my process
930  // (with rank myRank) get numReceives_ from
931  // toProcsFromMe[myRank]. The HPCCG miniapp uses the
932  // all-reduce method.
933  //
934  // Approaches 1 and 3 have the same critical path length.
935  // However, #3 moves more data. This is because the final
936  // result is just one integer, but #3 moves a whole array of
937  // results to all the processes. This is why we use Approach 1
938  // here.
939  //
940  // mfh 12 Apr 2013: See discussion in createFromSends() about
941  // how we could use this communication to propagate an error
942  // flag for "free" in a release build.
943 
944  const int root = 0; // rank of root process of the reduction
945  Array<int> numRecvsOnEachProc; // temp; only needed on root
946  if (myRank == root) {
947  numRecvsOnEachProc.resize (numProcs);
948  }
949  int numReceivesAsInt = 0; // output
950  reduce<int, int> (toProcsFromMe.getRawPtr (),
951  numRecvsOnEachProc.getRawPtr (),
952  numProcs, REDUCE_SUM, root, *comm_);
953  scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
954  &numReceivesAsInt, 1, root, *comm_);
955  numReceives_ = static_cast<size_t> (numReceivesAsInt);
956  }
957 
958  // Now we know numReceives_, which is this process' number of
959  // receives. Allocate the lengthsFrom_ and procsFrom_ arrays
960  // with this number of entries.
961  lengthsFrom_.assign (numReceives_, 0);
962  procsFrom_.assign (numReceives_, 0);
963 
964  //
965  // Ask (via nonblocking receive) each process from which we are
966  // receiving how many packets we should expect from it in the
967  // communication pattern.
968  //
969 
970  // At this point, numReceives_ includes any self message that
971  // there may be. At the end of this routine, we'll subtract off
972  // the self message (if there is one) from numReceives_. In this
973  // routine, we don't need to receive a message from ourselves in
974  // order to figure out our lengthsFrom_ and source process ID; we
975  // can just ask ourselves directly. Thus, the actual number of
976  // nonblocking receives we post here does not include the self
977  // message.
978  const size_t actualNumReceives = numReceives_ - (selfMessage_ ? 1 : 0);
979 
980  // Teuchos' wrapper for nonblocking receives requires receive
981  // buffers that it knows won't go away. This is why we use RCPs,
982  // one RCP per nonblocking receive request. They get allocated in
983  // the loop below.
984  Array<RCP<CommRequest<int> > > requests (actualNumReceives);
985  Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
986  Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
987 
988  // Teuchos::Comm treats a negative process ID as MPI_ANY_SOURCE
989  // (receive data from any process).
990 #ifdef HAVE_MPI
991  const int anySourceProc = MPI_ANY_SOURCE;
992 #else
993  const int anySourceProc = -1;
994 #endif
995 
996  if (debug_) {
997  std::ostringstream os;
998  os << myRank << ": computeReceives: Posting "
999  << actualNumReceives << " irecvs" << endl;
1000  *out_ << os.str ();
1001  }
1002 
1003  // Post the (nonblocking) receives.
1004  for (size_t i = 0; i < actualNumReceives; ++i) {
1005  // Once the receive completes, we can ask the corresponding
1006  // CommStatus object (output by wait()) for the sending process'
1007  // ID (which we'll assign to procsFrom_[i] -- don't forget to
1008  // do that!).
1009  lengthsFromBuffers[i].resize (1);
1010  lengthsFromBuffers[i][0] = as<size_t> (0);
1011  requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc, tag, *comm_);
1012  if (debug_) {
1013  std::ostringstream os;
1014  os << myRank << ": computeReceives: "
1015  "Posted any-proc irecv w/ specified tag " << tag << endl;
1016  *out_ << os.str ();
1017  }
1018  }
1019 
1020  if (debug_) {
1021  std::ostringstream os;
1022  os << myRank << ": computeReceives: "
1023  "posting " << numSends_ << " sends" << endl;
1024  *out_ << os.str ();
1025  }
1026  // Post the sends: Tell each process to which we are sending how
1027  // many packets it should expect from us in the communication
1028  // pattern. We could use nonblocking sends here, as long as we do
1029  // a waitAll() on all the sends and receives at once.
1030  //
1031  // We assume that numSends_ and selfMessage_ have already been
1032  // set. The value of numSends_ (my process' number of sends) does
1033  // not include any message that it might send to itself.
1034  for (size_t i = 0; i < numSends_ + (selfMessage_ ? 1 : 0); ++i) {
1035  if (procsTo_[i] != myRank) {
1036  // Send a message to procsTo_[i], telling that process that
1037  // this communication pattern will send that process
1038  // lengthsTo_[i] blocks of packets.
1039  const size_t* const lengthsTo_i = &lengthsTo_[i];
1040  send<int, size_t> (lengthsTo_i, 1, as<int> (procsTo_[i]), tag, *comm_);
1041  if (debug_) {
1042  std::ostringstream os;
1043  os << myRank << ": computeReceives: "
1044  "Posted send to Proc " << procsTo_[i] << " w/ specified tag "
1045  << tag << endl;
1046  *out_ << os.str ();
1047  }
1048  }
1049  else {
1050  // We don't need a send in the self-message case. If this
1051  // process will send a message to itself in the communication
1052  // pattern, then the last element of lengthsFrom_ and
1053  // procsFrom_ corresponds to the self-message. Of course
1054  // this process knows how long the message is, and the process
1055  // ID is its own process ID.
1056  lengthsFrom_[numReceives_-1] = lengthsTo_[i];
1057  procsFrom_[numReceives_-1] = myRank;
1058  }
1059  }
1060 
1061  if (debug_) {
1062  std::ostringstream os;
1063  os << myRank << ": computeReceives: waitAll on "
1064  << requests.size () << " requests" << endl;
1065  *out_ << os.str ();
1066  }
1067  //
1068  // Wait on all the receives. When they arrive, check the status
1069  // output of wait() for the receiving process ID, unpack the
1070  // request buffers into lengthsFrom_, and set procsFrom_ from the
1071  // status.
1072  //
1073  waitAll (*comm_, requests (), statuses ());
1074  for (size_t i = 0; i < actualNumReceives; ++i) {
1075  lengthsFrom_[i] = *lengthsFromBuffers[i];
1076  procsFrom_[i] = statuses[i]->getSourceRank ();
1077  }
1078 
1079  // Sort the procsFrom_ array, and apply the same permutation to
1080  // lengthsFrom_. This ensures that procsFrom_[i] and
1081  // lengthsFrom_[i] refers to the same thing.
1082  sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
1083 
1084  // Compute indicesFrom_
1085  totalReceiveLength_ = std::accumulate (lengthsFrom_.begin(), lengthsFrom_.end(), 0);
1086  indicesFrom_.clear ();
1087  indicesFrom_.reserve (totalReceiveLength_);
1088  for (size_t i = 0; i < totalReceiveLength_; ++i) {
1089  indicesFrom_.push_back(i);
1090  }
1091 
1092  startsFrom_.clear ();
1093  startsFrom_.reserve (numReceives_);
1094  for (size_t i = 0, j = 0; i < numReceives_; ++i) {
1095  startsFrom_.push_back(j);
1096  j += lengthsFrom_[i];
1097  }
1098 
1099  if (selfMessage_) {
1100  --numReceives_;
1101  }
1102 
1103  if (debug_) {
1104  std::ostringstream os;
1105  os << myRank << ": computeReceives: done" << endl;
1106  *out_ << os.str ();
1107  }
1108  }
1109 
1110  size_t
1111  Distributor::createFromSends (const Teuchos::ArrayView<const int> &exportProcIDs)
1112  {
1113  using Teuchos::outArg;
1114  using Teuchos::REDUCE_MAX;
1115  using Teuchos::reduceAll;
1116  using std::endl;
1117 
1118  Teuchos::OSTab tab (out_);
1119  const size_t numExports = exportProcIDs.size();
1120  const int myProcID = comm_->getRank();
1121  const int numProcs = comm_->getSize();
1122  if (debug_) {
1123  std::ostringstream os;
1124  os << myProcID << ": createFromSends" << endl;
1125  *out_ << os.str ();
1126  }
1127 
1128  // exportProcIDs tells us the communication pattern for this
1129  // distributor. It dictates the way that the export data will be
1130  // interpreted in doPosts(). We want to perform at most one
1131  // send per process in doPosts; this is for two reasons:
1132  // * minimize latency / overhead in the comm routines (nice)
1133  // * match the number of receives and sends between processes
1134  // (necessary)
1135  //
1136  // Teuchos::Comm requires that the data for a send are contiguous
1137  // in a send buffer. Therefore, if the data in the send buffer
1138  // for doPosts() are not contiguous, they will need to be copied
1139  // into a contiguous buffer. The user has specified this
1140  // noncontiguous pattern and we can't do anything about it.
1141  // However, if they do not provide an efficient pattern, we will
1142  // warn them if one of the following compile-time options has been
1143  // set:
1144  // * HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS
1145  // * HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS
1146  //
1147  // If the data are contiguous, then we can post the sends in situ
1148  // (i.e., without needing to copy them into a send buffer).
1149  //
1150  // Determine contiguity. There are a number of ways to do this:
1151  // * If the export IDs are sorted, then all exports to a
1152  // particular proc must be contiguous. This is what Epetra does.
1153  // * If the export ID of the current export already has been
1154  // listed, then the previous listing should correspond to the
1155  // same export. This tests contiguity, but not sortedness.
1156  //
1157  // Both of these tests require O(n), where n is the number of
1158  // exports. However, the latter will positively identify a greater
1159  // portion of contiguous patterns. We use the latter method.
1160  //
1161  // Check to see if values are grouped by procs without gaps
1162  // If so, indices_to -> 0.
1163 
1164  // Set up data structures for quick traversal of arrays.
1165  // This contains the number of sends for each process ID.
1166  //
1167  // FIXME (mfh 20 Mar 2014) This is one of a few places in Tpetra
1168  // that create an array of length the number of processes in the
1169  // communicator (plus one). Given how this code uses this array,
1170  // it should be straightforward to replace it with a hash table or
1171  // some other more space-efficient data structure. In practice,
1172  // most of the entries of starts should be zero for a sufficiently
1173  // large process count, unless the communication pattern is dense.
1174  // Note that it's important to be able to iterate through keys (i
1175  // for which starts[i] is nonzero) in increasing order.
1176  Teuchos::Array<size_t> starts (numProcs + 1, 0);
1177 
1178  // numActive is the number of sends that are not Null
1179  size_t numActive = 0;
1180  int needSendBuff = 0; // Boolean
1181 
1182 #ifdef HAVE_TPETRA_DEBUG
1183  int badID = -1; // only used in a debug build
1184 #endif // HAVE_TPETRA_DEBUG
1185  for (size_t i = 0; i < numExports; ++i) {
1186  const int exportID = exportProcIDs[i];
1187  if (exportID >= numProcs) {
1188 #ifdef HAVE_TPETRA_DEBUG
1189  badID = myProcID;
1190 #endif // HAVE_TPETRA_DEBUG
1191  break;
1192  }
1193  else if (exportID >= 0) {
1194  // exportID is a valid process ID. Increment the number of
1195  // messages this process will send to that process.
1196  ++starts[exportID];
1197 
1198  // If we're sending more than one message to process exportID,
1199  // then it is possible that the data are not contiguous.
1200  // Check by seeing if the previous process ID in the list
1201  // (exportProcIDs[i-1]) is the same. It's safe to use i-1,
1202  // because if starts[exportID] > 1, then i must be > 1 (since
1203  // the starts array was filled with zeros initially).
1204 
1205  // null entries break continuity.
1206  // e.g., [ 0, 0, 0, 1, -99, 1, 2, 2, 2] is not contiguous
1207  if (needSendBuff==0 && starts[exportID] > 1 && exportID != exportProcIDs[i-1]) {
1208  needSendBuff = 1;
1209  }
1210  ++numActive;
1211  }
1212  }
1213 
1214 #ifdef HAVE_TPETRA_DEBUG
1215  // Test whether any process in the communicator got an invalid
1216  // process ID. If badID != -1 on this process, then it equals
1217  // this process' rank. The max of all badID over all processes is
1218  // the max rank which has an invalid process ID.
1219  {
1220  int gbl_badID;
1221  reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
1222  TEUCHOS_TEST_FOR_EXCEPTION(gbl_badID >= 0, std::runtime_error,
1223  Teuchos::typeName(*this) << "::createFromSends(): Process " << gbl_badID
1224  << ", perhaps among other processes, got a bad send process ID.");
1225  }
1226 #else
1227  // FIXME (mfh 12 Apr 2013, 15 Jul 2015) Rather than simply
1228  // ignoring this information, we should think about how to pass it
1229  // along so that all the processes find out about it. In a
1230  // release build with efficiency warnings turned off, the next
1231  // collective communication happens in computeReceives(). We
1232  // could figure out how to encode the error flag in that
1233  // operation, for example by adding an extra entry to the
1234  // collective's output array that encodes the error condition (0
1235  // on all processes if no error, else 1 on any process with the
1236  // error, so that the sum will produce a nonzero value if any
1237  // process had an error). I'll defer this change for now and
1238  // recommend instead that people with troubles try a debug build.
1239 #endif // HAVE_TPETRA_DEBUG
1240 
1241 #if defined(HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS) || defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
1242  {
1243  int global_needSendBuff;
1244  reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
1245  outArg (global_needSendBuff));
1247  global_needSendBuff != 0, std::runtime_error,
1248  "::createFromSends: Grouping export IDs together by process rank often "
1249  "improves performance.");
1250  }
1251 #endif
1252 
1253  // Determine from the caller's data whether or not the current
1254  // process should send (a) message(s) to itself.
1255  if (starts[myProcID] != 0) {
1256  selfMessage_ = true;
1257  }
1258  else {
1259  selfMessage_ = false;
1260  }
1261 
1262 #ifdef HAVE_TEUCHOS_DEBUG
1263  bool index_neq_numActive = false;
1264  bool send_neq_numSends = false;
1265 #endif
1266  if (! needSendBuff) {
1267  // grouped by proc, no send buffer or indicesTo_ needed
1268  numSends_ = 0;
1269  // Count total number of sends, i.e., total number of procs to
1270  // which we are sending. This includes myself, if applicable.
1271  for (int i = 0; i < numProcs; ++i) {
1272  if (starts[i]) {
1273  ++numSends_;
1274  }
1275  }
1276 
1277  // Not only do we not need these, but we must clear them, as
1278  // empty status of indicesTo is a flag used later.
1279  indicesTo_.resize(0);
1280  // Size these to numSends_; note, at the moment, numSends_
1281  // includes self sends. Set their values to zeros.
1282  procsTo_.assign(numSends_,0);
1283  startsTo_.assign(numSends_,0);
1284  lengthsTo_.assign(numSends_,0);
1285 
1286  // set startsTo to the offset for each send (i.e., each proc ID)
1287  // set procsTo to the proc ID for each send
1288  // in interpreting this code, remember that we are assuming contiguity
1289  // that is why index skips through the ranks
1290  {
1291  size_t index = 0, procIndex = 0;
1292  for (size_t i = 0; i < numSends_; ++i) {
1293  while (exportProcIDs[procIndex] < 0) {
1294  ++procIndex; // skip all negative proc IDs
1295  }
1296  startsTo_[i] = procIndex;
1297  int procID = exportProcIDs[procIndex];
1298  procsTo_[i] = procID;
1299  index += starts[procID];
1300  procIndex += starts[procID];
1301  }
1302 #ifdef HAVE_TEUCHOS_DEBUG
1303  if (index != numActive) {
1304  index_neq_numActive = true;
1305  }
1306 #endif
1307  }
1308  // sort the startsTo and proc IDs together, in ascending order, according
1309  // to proc IDs
1310  if (numSends_ > 0) {
1311  sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1312  }
1313  // compute the maximum send length
1314  maxSendLength_ = 0;
1315  for (size_t i = 0; i < numSends_; ++i) {
1316  int procID = procsTo_[i];
1317  lengthsTo_[i] = starts[procID];
1318  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1319  maxSendLength_ = lengthsTo_[i];
1320  }
1321  }
1322  }
1323  else {
1324  // not grouped by proc, need send buffer and indicesTo_
1325 
1326  // starts[i] is the number of sends to proc i
1327  // numActive equals number of sends total, \sum_i starts[i]
1328 
1329  // this loop starts at starts[1], so explicitly check starts[0]
1330  if (starts[0] == 0 ) {
1331  numSends_ = 0;
1332  }
1333  else {
1334  numSends_ = 1;
1335  }
1336  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1337  im1=starts.begin();
1338  i != starts.end(); ++i)
1339  {
1340  if (*i != 0) ++numSends_;
1341  *i += *im1;
1342  im1 = i;
1343  }
1344  // starts[i] now contains the number of exports to procs 0 through i
1345 
1346  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1347  i=starts.rbegin()+1;
1348  i != starts.rend(); ++i)
1349  {
1350  *ip1 = *i;
1351  ip1 = i;
1352  }
1353  starts[0] = 0;
1354  // starts[i] now contains the number of exports to procs 0 through
1355  // i-1, i.e., all procs before proc i
1356 
1357  indicesTo_.resize(numActive);
1358 
1359  for (size_t i = 0; i < numExports; ++i) {
1360  if (exportProcIDs[i] >= 0) {
1361  // record the offset to the sendBuffer for this export
1362  indicesTo_[starts[exportProcIDs[i]]] = i;
1363  // now increment the offset for this proc
1364  ++starts[exportProcIDs[i]];
1365  }
1366  }
1367  // our send buffer will contain the export data for each of the procs
1368  // we communicate with, in order by proc id
1369  // sendBuffer = {proc_0_data, proc_1_data, ..., proc_np-1_data}
1370  // indicesTo now maps each export to the location in our send buffer
1371  // associated with the export
1372  // data for export i located at sendBuffer[indicesTo[i]]
1373  //
1374  // starts[i] once again contains the number of exports to
1375  // procs 0 through i
1376  for (int proc = numProcs-1; proc != 0; --proc) {
1377  starts[proc] = starts[proc-1];
1378  }
1379  starts.front() = 0;
1380  starts[numProcs] = numActive;
1381  //
1382  // starts[proc] once again contains the number of exports to
1383  // procs 0 through proc-1
1384  // i.e., the start of my data in the sendBuffer
1385 
1386  // this contains invalid data at procs we don't care about, that is okay
1387  procsTo_.resize(numSends_);
1388  startsTo_.resize(numSends_);
1389  lengthsTo_.resize(numSends_);
1390 
1391  // for each group of sends/exports, record the destination proc,
1392  // the length, and the offset for this send into the
1393  // send buffer (startsTo_)
1394  maxSendLength_ = 0;
1395  size_t snd = 0;
1396  for (int proc = 0; proc < numProcs; ++proc ) {
1397  if (starts[proc+1] != starts[proc]) {
1398  lengthsTo_[snd] = starts[proc+1] - starts[proc];
1399  startsTo_[snd] = starts[proc];
1400  // record max length for all off-proc sends
1401  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1402  maxSendLength_ = lengthsTo_[snd];
1403  }
1404  procsTo_[snd] = proc;
1405  ++snd;
1406  }
1407  }
1408 #ifdef HAVE_TEUCHOS_DEBUG
1409  if (snd != numSends_) {
1410  send_neq_numSends = true;
1411  }
1412 #endif
1413  }
1414 #ifdef HAVE_TEUCHOS_DEBUG
1415  SHARED_TEST_FOR_EXCEPTION(index_neq_numActive, std::logic_error,
1416  "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1417  SHARED_TEST_FOR_EXCEPTION(send_neq_numSends, std::logic_error,
1418  "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1419 #endif
1420 
1421  if (selfMessage_) --numSends_;
1422 
1423  // Invert map to see what msgs are received and what length
1424  computeReceives();
1425 
1426  if (debug_) {
1427  std::ostringstream os;
1428  os << myProcID << ": createFromSends: done" << endl;
1429  *out_ << os.str ();
1430  }
1431 
1432  // createFromRecvs() calls createFromSends(), but will set
1433  // howInitialized_ again after calling createFromSends().
1434  howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
1435 
1436  return totalReceiveLength_;
1437  }
1438 
1439  void
1441  createFromSendsAndRecvs (const Teuchos::ArrayView<const int>& exportProcIDs,
1442  const Teuchos::ArrayView<const int>& remoteProcIDs)
1443  {
1444  // note the exportProcIDs and remoteProcIDs _must_ be a list that has
1445  // an entry for each GID. If the export/remoteProcIDs is taken from
1446  // the getProcs{From|To} lists that are extracted from a previous distributor,
1447  // it will generate a wrong answer, because those lists have a unique entry
1448  // for each processor id. A version of this with lengthsTo and lengthsFrom
1449  // should be made.
1450 
1451  howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
1452 
1453 
1454  int myProcID = comm_->getRank ();
1455  int numProcs = comm_->getSize();
1456 
1457  const size_t numExportIDs = exportProcIDs.size();
1458  Teuchos::Array<size_t> starts (numProcs + 1, 0);
1459 
1460  size_t numActive = 0;
1461  int needSendBuff = 0; // Boolean
1462 
1463  for(size_t i = 0; i < numExportIDs; i++ )
1464  {
1465  if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
1466  needSendBuff = 1;
1467  if( exportProcIDs[i] >= 0 )
1468  {
1469  ++starts[ exportProcIDs[i] ];
1470  ++numActive;
1471  }
1472  }
1473 
1474  selfMessage_ = ( starts[myProcID] != 0 ) ? 1 : 0;
1475 
1476  numSends_ = 0;
1477 
1478  if( needSendBuff ) //grouped by processor, no send buffer or indicesTo_ needed
1479  {
1480  if (starts[0] == 0 ) {
1481  numSends_ = 0;
1482  }
1483  else {
1484  numSends_ = 1;
1485  }
1486  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1487  im1=starts.begin();
1488  i != starts.end(); ++i)
1489  {
1490  if (*i != 0) ++numSends_;
1491  *i += *im1;
1492  im1 = i;
1493  }
1494  // starts[i] now contains the number of exports to procs 0 through i
1495 
1496  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1497  i=starts.rbegin()+1;
1498  i != starts.rend(); ++i)
1499  {
1500  *ip1 = *i;
1501  ip1 = i;
1502  }
1503  starts[0] = 0;
1504  // starts[i] now contains the number of exports to procs 0 through
1505  // i-1, i.e., all procs before proc i
1506 
1507  indicesTo_.resize(numActive);
1508 
1509  for (size_t i = 0; i < numExportIDs; ++i) {
1510  if (exportProcIDs[i] >= 0) {
1511  // record the offset to the sendBuffer for this export
1512  indicesTo_[starts[exportProcIDs[i]]] = i;
1513  // now increment the offset for this proc
1514  ++starts[exportProcIDs[i]];
1515  }
1516  }
1517  for (int proc = numProcs-1; proc != 0; --proc) {
1518  starts[proc] = starts[proc-1];
1519  }
1520  starts.front() = 0;
1521  starts[numProcs] = numActive;
1522  procsTo_.resize(numSends_);
1523  startsTo_.resize(numSends_);
1524  lengthsTo_.resize(numSends_);
1525  maxSendLength_ = 0;
1526  size_t snd = 0;
1527  for (int proc = 0; proc < numProcs; ++proc ) {
1528  if (starts[proc+1] != starts[proc]) {
1529  lengthsTo_[snd] = starts[proc+1] - starts[proc];
1530  startsTo_[snd] = starts[proc];
1531  // record max length for all off-proc sends
1532  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1533  maxSendLength_ = lengthsTo_[snd];
1534  }
1535  procsTo_[snd] = proc;
1536  ++snd;
1537  }
1538  }
1539  }
1540  else {
1541  // grouped by proc, no send buffer or indicesTo_ needed
1542  numSends_ = 0;
1543  // Count total number of sends, i.e., total number of procs to
1544  // which we are sending. This includes myself, if applicable.
1545  for (int i = 0; i < numProcs; ++i) {
1546  if (starts[i]) {
1547  ++numSends_;
1548  }
1549  }
1550 
1551  // Not only do we not need these, but we must clear them, as
1552  // empty status of indicesTo is a flag used later.
1553  indicesTo_.resize(0);
1554  // Size these to numSends_; note, at the moment, numSends_
1555  // includes self sends. Set their values to zeros.
1556  procsTo_.assign(numSends_,0);
1557  startsTo_.assign(numSends_,0);
1558  lengthsTo_.assign(numSends_,0);
1559 
1560  // set startsTo to the offset for each send (i.e., each proc ID)
1561  // set procsTo to the proc ID for each send
1562  // in interpreting this code, remember that we are assuming contiguity
1563  // that is why index skips through the ranks
1564  {
1565  size_t index = 0, procIndex = 0;
1566  for (size_t i = 0; i < numSends_; ++i) {
1567  while (exportProcIDs[procIndex] < 0) {
1568  ++procIndex; // skip all negative proc IDs
1569  }
1570  startsTo_[i] = procIndex;
1571  int procID = exportProcIDs[procIndex];
1572  procsTo_[i] = procID;
1573  index += starts[procID];
1574  procIndex += starts[procID];
1575  }
1576  }
1577  // sort the startsTo and proc IDs together, in ascending order, according
1578  // to proc IDs
1579  if (numSends_ > 0) {
1580  sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1581  }
1582  // compute the maximum send length
1583  maxSendLength_ = 0;
1584  for (size_t i = 0; i < numSends_; ++i) {
1585  int procID = procsTo_[i];
1586  lengthsTo_[i] = starts[procID];
1587  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1588  maxSendLength_ = lengthsTo_[i];
1589  }
1590  }
1591  }
1592 
1593 
1594  numSends_ -= selfMessage_;
1595  std::vector<int> recv_list;
1596  recv_list.reserve(numSends_); //reserve an initial guess for size needed
1597 
1598  int last_pid=-2;
1599  for(int i=0; i<remoteProcIDs.size(); i++) {
1600  if(remoteProcIDs[i]>last_pid) {
1601  recv_list.push_back(remoteProcIDs[i]);
1602  last_pid = remoteProcIDs[i];
1603  }
1604  else if (remoteProcIDs[i]<last_pid)
1605  throw std::runtime_error("Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
1606  }
1607  numReceives_ = recv_list.size();
1608  if(numReceives_) {
1609  procsFrom_.assign(numReceives_,0);
1610  lengthsFrom_.assign(numReceives_,0);
1611  indicesFrom_.assign(numReceives_,0);
1612  startsFrom_.assign(numReceives_,0);
1613  }
1614  for(size_t i=0,j=0; i<numReceives_; ++i) {
1615  int jlast=j;
1616  procsFrom_[i] = recv_list[i];
1617  startsFrom_[i] = j;
1618  for( ; j<(size_t)remoteProcIDs.size() &&
1619  remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
1620  lengthsFrom_[i] = j-jlast;
1621  }
1622  totalReceiveLength_ = remoteProcIDs.size();
1623  indicesFrom_.clear ();
1624  indicesFrom_.reserve (totalReceiveLength_);
1625  for (size_t i = 0; i < totalReceiveLength_; ++i) {
1626  indicesFrom_.push_back(i);
1627  }
1628 
1629  numReceives_-=selfMessage_;
1630  }
1631 
1632 } // namespace Tpetra
Namespace Tpetra contains the class and methods constituting the Tpetra library.
Teuchos::RCP< Distributor > getReverse() const
A reverse communication plan Distributor.
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
Teuchos::RCP< const Teuchos::ParameterList > getValidParameters() const
List of valid Distributor parameters.
size_t getTotalReceiveLength() const
Total number of values this process will receive from other processes.
size_t getMaxSendLength() const
Maximum number of values this process will send to another single process.
void swap(Distributor &rhs)
Swap the contents of rhs with those of *this.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
Teuchos::ArrayView< const size_t > getLengthsFrom() const
Number of values this process will receive from each process.
size_t getNumReceives() const
The number of processes from which we will receive data.
size_t getNumSends() const
The number of processes to which we will send data.
Implementation details of Tpetra.
size_t createFromSends(const Teuchos::ArrayView< const int > &exportProcIDs)
Set up Distributor using list of process ranks to which this process will send.
void gathervPrint(std::ostream &out, const std::string &s, const Teuchos::Comm< int > &comm)
On Process 0 in the given communicator, print strings from each process in that communicator, in rank order.
Teuchos::ArrayView< const size_t > getLengthsTo() const
Number of values this process will send to each process.
void describe(Teuchos::FancyOStream &out, const Teuchos::EVerbosityLevel verbLevel=Teuchos::Describable::verbLevel_default) const
Describe this object in a human-readable way to the given output stream.
void createFromSendsAndRecvs(const Teuchos::ArrayView< const int > &exportProcIDs, const Teuchos::ArrayView< const int > &remoteProcIDs)
Set up Distributor using list of process ranks to which to send, and list of process ranks from which...
Teuchos::ArrayView< const int > getProcsFrom() const
Ranks of the processes sending values to this process.
Sets up and executes a communication plan for a Tpetra DistObject.
void setParameterList(const Teuchos::RCP< Teuchos::ParameterList > &plist)
Set Distributor parameters.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, Exception, msg)
Print or throw an efficency warning.
Teuchos::ArrayView< const int > getProcsTo() const
Ranks of the processes to which this process will send values.
virtual ~Distributor()
Destructor (virtual for memory safety).
Teuchos::Array< std::string > distributorSendTypes()
Valid values for Distributor&#39;s "Send type" parameter.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2)
Sort the first array, and apply the resulting permutation to the second array.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
std::string description() const
Return a one-line description of this object.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
bool hasSelfMessage() const
Whether the calling process will send or receive messages to itself.
EDistributorSendType
The type of MPI send that Distributor should use.
Distributor(const Teuchos::RCP< const Teuchos::Comm< int > > &comm)
Construct using the specified communicator and default parameters.