41 #include "Tpetra_Distributor.hpp" 42 #include "Tpetra_Details_gathervPrint.hpp" 43 #include "Teuchos_StandardParameterEntryValidators.hpp" 44 #include "Teuchos_VerboseObjectParameterListHelpers.hpp" 51 if (sendType == DISTRIBUTOR_ISEND) {
54 else if (sendType == DISTRIBUTOR_RSEND) {
57 else if (sendType == DISTRIBUTOR_SEND) {
60 else if (sendType == DISTRIBUTOR_SSEND) {
64 TEUCHOS_TEST_FOR_EXCEPTION(
true, std::invalid_argument,
"Invalid " 65 "EDistributorSendType enum value " << sendType <<
".");
73 case Details::DISTRIBUTOR_NOT_INITIALIZED:
74 return "Not initialized yet";
75 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
76 return "By createFromSends";
77 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
78 return "By createFromRecvs";
79 case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
80 return "By createReverseDistributor";
81 case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
82 return "By copy constructor";
89 Teuchos::Array<std::string>
92 Teuchos::Array<std::string> sendTypes;
93 sendTypes.push_back (
"Isend");
94 sendTypes.push_back (
"Rsend");
95 sendTypes.push_back (
"Send");
96 sendTypes.push_back (
"Ssend");
106 const bool tpetraDistributorDebugDefault =
false;
108 const bool barrierBetween_default =
false;
110 const bool useDistinctTags_default =
true;
113 int Distributor::getTag (
const int pathTag)
const {
114 return useDistinctTags_ ? pathTag : comm_->getTag ();
118 #ifdef TPETRA_DISTRIBUTOR_TIMERS 119 void Distributor::makeTimers () {
120 const std::string name_doPosts3 =
"Tpetra::Distributor: doPosts(3)";
121 const std::string name_doPosts4 =
"Tpetra::Distributor: doPosts(4)";
122 const std::string name_doWaits =
"Tpetra::Distributor: doWaits";
123 const std::string name_doPosts3_recvs =
"Tpetra::Distributor: doPosts(3): recvs";
124 const std::string name_doPosts4_recvs =
"Tpetra::Distributor: doPosts(4): recvs";
125 const std::string name_doPosts3_barrier =
"Tpetra::Distributor: doPosts(3): barrier";
126 const std::string name_doPosts4_barrier =
"Tpetra::Distributor: doPosts(4): barrier";
127 const std::string name_doPosts3_sends =
"Tpetra::Distributor: doPosts(3): sends";
128 const std::string name_doPosts4_sends =
"Tpetra::Distributor: doPosts(4): sends";
130 timer_doPosts3_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3);
131 timer_doPosts4_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4);
132 timer_doWaits_ = Teuchos::TimeMonitor::getNewTimer (name_doWaits);
133 timer_doPosts3_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_recvs);
134 timer_doPosts4_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_recvs);
135 timer_doPosts3_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_barrier);
136 timer_doPosts4_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_barrier);
137 timer_doPosts3_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_sends);
138 timer_doPosts4_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_sends);
140 #endif // TPETRA_DISTRIBUTOR_TIMERS 143 Distributor::init (
const Teuchos::RCP<
const Teuchos::Comm<int> >& comm,
144 const Teuchos::RCP<Teuchos::FancyOStream>& out,
145 const Teuchos::RCP<Teuchos::ParameterList>& plist)
147 this->out_ = out.is_null () ?
148 Teuchos::getFancyOStream (Teuchos::rcpFromRef (std::cerr)) : out;
149 if (! plist.is_null ()) {
150 this->setParameterList (plist);
153 #ifdef TPETRA_DISTRIBUTOR_TIMERS 155 #endif // TPETRA_DISTRIBUTOR_TIMERS 158 TEUCHOS_TEST_FOR_EXCEPTION
159 (out_.is_null (), std::logic_error,
"Tpetra::Distributor::init: debug_ " 160 "is true but out_ (pointer to the output stream) is NULL. Please " 161 "report this bug to the Tpetra developers.");
162 Teuchos::OSTab tab (out_);
163 std::ostringstream os;
164 os << comm_->getRank ()
165 <<
": Distributor ctor done" << std::endl;
172 , howInitialized_ (
Details::DISTRIBUTOR_NOT_INITIALIZED)
173 , sendType_ (
Details::DISTRIBUTOR_SEND)
174 , barrierBetween_ (barrierBetween_default)
175 , debug_ (tpetraDistributorDebugDefault)
176 , selfMessage_ (false)
180 , totalReceiveLength_ (0)
181 , lastRoundBytesSend_ (0)
182 , lastRoundBytesRecv_ (0)
183 , useDistinctTags_ (useDistinctTags_default)
185 init (comm, Teuchos::null, Teuchos::null);
189 const Teuchos::RCP<Teuchos::FancyOStream>& out)
191 , howInitialized_ (
Details::DISTRIBUTOR_NOT_INITIALIZED)
192 , sendType_ (
Details::DISTRIBUTOR_SEND)
193 , barrierBetween_ (barrierBetween_default)
194 , debug_ (tpetraDistributorDebugDefault)
195 , selfMessage_ (false)
199 , totalReceiveLength_ (0)
200 , lastRoundBytesSend_ (0)
201 , lastRoundBytesRecv_ (0)
202 , useDistinctTags_ (useDistinctTags_default)
204 init (comm, out, Teuchos::null);
208 const Teuchos::RCP<Teuchos::ParameterList>& plist)
210 , howInitialized_ (
Details::DISTRIBUTOR_NOT_INITIALIZED)
211 , sendType_ (
Details::DISTRIBUTOR_SEND)
212 , barrierBetween_ (barrierBetween_default)
213 , debug_ (tpetraDistributorDebugDefault)
214 , selfMessage_ (false)
218 , totalReceiveLength_ (0)
219 , lastRoundBytesSend_ (0)
220 , lastRoundBytesRecv_ (0)
221 , useDistinctTags_ (useDistinctTags_default)
223 init (comm, Teuchos::null, plist);
227 const Teuchos::RCP<Teuchos::FancyOStream>& out,
228 const Teuchos::RCP<Teuchos::ParameterList>& plist)
230 , howInitialized_ (
Details::DISTRIBUTOR_NOT_INITIALIZED)
231 , sendType_ (
Details::DISTRIBUTOR_SEND)
232 , barrierBetween_ (barrierBetween_default)
233 , debug_ (tpetraDistributorDebugDefault)
234 , selfMessage_ (false)
238 , totalReceiveLength_ (0)
239 , lastRoundBytesSend_ (0)
240 , lastRoundBytesRecv_ (0)
241 , useDistinctTags_ (useDistinctTags_default)
243 init (comm, out, plist);
247 : comm_ (distributor.comm_)
248 , out_ (distributor.out_)
249 , howInitialized_ (
Details::DISTRIBUTOR_INITIALIZED_BY_COPY)
250 , sendType_ (distributor.sendType_)
251 , barrierBetween_ (distributor.barrierBetween_)
252 , debug_ (distributor.debug_)
253 , selfMessage_ (distributor.selfMessage_)
254 , numSends_ (distributor.numSends_)
255 , procsTo_ (distributor.procsTo_)
256 , startsTo_ (distributor.startsTo_)
257 , lengthsTo_ (distributor.lengthsTo_)
258 , maxSendLength_ (distributor.maxSendLength_)
259 , indicesTo_ (distributor.indicesTo_)
260 , numReceives_ (distributor.numReceives_)
261 , totalReceiveLength_ (distributor.totalReceiveLength_)
262 , lengthsFrom_ (distributor.lengthsFrom_)
263 , procsFrom_ (distributor.procsFrom_)
264 , startsFrom_ (distributor.startsFrom_)
265 , indicesFrom_ (distributor.indicesFrom_)
266 , reverseDistributor_ (distributor.reverseDistributor_)
267 , lastRoundBytesSend_ (distributor.lastRoundBytesSend_)
268 , lastRoundBytesRecv_ (distributor.lastRoundBytesRecv_)
269 , useDistinctTags_ (distributor.useDistinctTags_)
271 using Teuchos::ParameterList;
272 using Teuchos::parameterList;
282 RCP<const ParameterList> rhsList = distributor.getParameterList ();
283 if (! rhsList.is_null ()) {
284 this->setMyParamList (parameterList (* rhsList));
287 #ifdef TPETRA_DISTRIBUTOR_TIMERS 289 #endif // TPETRA_DISTRIBUTOR_TIMERS 292 TEUCHOS_TEST_FOR_EXCEPTION
293 (out_.is_null (), std::logic_error,
"Tpetra::Distributor::init: debug_ " 294 "is true but out_ (pointer to the output stream) is NULL. Please " 295 "report this bug to the Tpetra developers.");
296 Teuchos::OSTab tab (out_);
297 std::ostringstream os;
298 os << comm_->getRank ()
299 <<
": Distributor copy ctor done" << std::endl;
305 using Teuchos::ParameterList;
306 using Teuchos::parameterList;
309 std::swap (comm_, rhs.comm_);
310 std::swap (out_, rhs.out_);
311 std::swap (howInitialized_, rhs.howInitialized_);
312 std::swap (sendType_, rhs.sendType_);
313 std::swap (barrierBetween_, rhs.barrierBetween_);
314 std::swap (debug_, rhs.debug_);
315 std::swap (selfMessage_, rhs.selfMessage_);
316 std::swap (numSends_, rhs.numSends_);
317 std::swap (procsTo_, rhs.procsTo_);
318 std::swap (startsTo_, rhs.startsTo_);
319 std::swap (lengthsTo_, rhs.lengthsTo_);
320 std::swap (maxSendLength_, rhs.maxSendLength_);
321 std::swap (indicesTo_, rhs.indicesTo_);
322 std::swap (numReceives_, rhs.numReceives_);
323 std::swap (totalReceiveLength_, rhs.totalReceiveLength_);
324 std::swap (lengthsFrom_, rhs.lengthsFrom_);
325 std::swap (procsFrom_, rhs.procsFrom_);
326 std::swap (startsFrom_, rhs.startsFrom_);
327 std::swap (indicesFrom_, rhs.indicesFrom_);
328 std::swap (reverseDistributor_, rhs.reverseDistributor_);
329 std::swap (lastRoundBytesSend_, rhs.lastRoundBytesSend_);
330 std::swap (lastRoundBytesRecv_, rhs.lastRoundBytesRecv_);
331 std::swap (useDistinctTags_, rhs.useDistinctTags_);
335 RCP<ParameterList> lhsList = this->getNonconstParameterList ();
336 RCP<ParameterList> rhsList = rhs.getNonconstParameterList ();
337 if (lhsList.getRawPtr () == rhsList.getRawPtr () && ! rhsList.is_null ()) {
338 rhsList = parameterList (*rhsList);
340 if (! rhsList.is_null ()) {
341 this->setMyParamList (rhsList);
343 if (! lhsList.is_null ()) {
344 rhs.setMyParamList (lhsList);
373 using Teuchos::FancyOStream;
374 using Teuchos::getIntegralValue;
375 using Teuchos::includesVerbLevel;
376 using Teuchos::OSTab;
377 using Teuchos::ParameterList;
378 using Teuchos::parameterList;
383 plist->validateParametersAndSetDefaults (*validParams);
385 const bool barrierBetween =
386 plist->get<
bool> (
"Barrier between receives and sends");
388 getIntegralValue<Details::EDistributorSendType> (*plist,
"Send type");
389 const bool useDistinctTags = plist->get<
bool> (
"Use distinct tags");
390 const bool debug = plist->get<
bool> (
"Debug");
395 const bool enable_cuda_rdma =
396 plist->get<
bool> (
"Enable MPI CUDA RDMA support");
397 TEUCHOS_TEST_FOR_EXCEPTION
398 (! enable_cuda_rdma, std::invalid_argument,
"Tpetra::Distributor::" 399 "setParameterList: " <<
"You specified \"Enable MPI CUDA RDMA " 400 "support\" = false. This is no longer valid. You don't need to " 401 "specify this option any more; Tpetra assumes it is always true. " 402 "This is a very light assumption on the MPI implementation, and in " 403 "fact does not actually involve hardware or system RDMA support. " 404 "Tpetra just assumes that the MPI implementation can tell whether a " 405 "pointer points to host memory or CUDA device memory.");
412 TEUCHOS_TEST_FOR_EXCEPTION(
413 ! barrierBetween && sendType == Details::DISTRIBUTOR_RSEND,
414 std::invalid_argument,
"Tpetra::Distributor::setParameterList: " << endl
415 <<
"You specified \"Send type\"=\"Rsend\", but turned off the barrier " 416 "between receives and sends." << endl <<
"This is invalid; you must " 417 "include the barrier if you use ready sends." << endl <<
"Ready sends " 418 "require that their corresponding receives have already been posted, " 419 "and the only way to guarantee that in general is with a barrier.");
422 sendType_ = sendType;
423 barrierBetween_ = barrierBetween;
424 useDistinctTags_ = useDistinctTags;
429 this->setMyParamList (plist);
432 Teuchos::RCP<const Teuchos::ParameterList>
435 using Teuchos::Array;
436 using Teuchos::ParameterList;
437 using Teuchos::parameterList;
439 using Teuchos::setStringToIntegralParameter;
441 const bool barrierBetween = barrierBetween_default;
442 const bool useDistinctTags = useDistinctTags_default;
443 const bool debug = tpetraDistributorDebugDefault;
446 const std::string defaultSendType (
"Send");
447 Array<Details::EDistributorSendType> sendTypeEnums;
448 sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
449 sendTypeEnums.push_back (Details::DISTRIBUTOR_RSEND);
450 sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
451 sendTypeEnums.push_back (Details::DISTRIBUTOR_SSEND);
453 RCP<ParameterList> plist = parameterList (
"Tpetra::Distributor");
454 plist->set (
"Barrier between receives and sends", barrierBetween,
455 "Whether to execute a barrier between receives and sends in do" 456 "[Reverse]Posts(). Required for correctness when \"Send type\"" 457 "=\"Rsend\", otherwise correct but not recommended.");
458 setStringToIntegralParameter<Details::EDistributorSendType> (
"Send type",
459 defaultSendType,
"When using MPI, the variant of send to use in " 460 "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
461 plist->set (
"Use distinct tags", useDistinctTags,
"Whether to use distinct " 462 "MPI message tags for different code paths. Highly recommended" 463 " to avoid message collisions.");
464 plist->set (
"Debug", debug,
"Whether to print copious debugging output on " 466 plist->set (
"Enable MPI CUDA RDMA support",
true,
"Assume that MPI can " 467 "tell whether a pointer points to host memory or CUDA device " 468 "memory. You don't need to specify this option any more; " 469 "Tpetra assumes it is always true. This is a very light " 470 "assumption on the MPI implementation, and in fact does not " 471 "actually involve hardware or system RDMA support.");
479 Teuchos::setupVerboseObjectSublist (&*plist);
480 return Teuchos::rcp_const_cast<
const ParameterList> (plist);
485 {
return totalReceiveLength_; }
488 {
return numReceives_; }
491 {
return selfMessage_; }
494 {
return numSends_; }
497 {
return maxSendLength_; }
500 {
return procsFrom_; }
503 {
return lengthsFrom_; }
509 {
return lengthsTo_; }
511 Teuchos::RCP<Distributor>
513 if (reverseDistributor_.is_null ()) {
514 createReverseDistributor ();
516 TEUCHOS_TEST_FOR_EXCEPTION
517 (reverseDistributor_.is_null (), std::logic_error,
"The reverse " 518 "Distributor is null after createReverseDistributor returned. " 519 "Please report this bug to the Tpetra developers.");
520 return reverseDistributor_;
525 Distributor::createReverseDistributor()
const 527 reverseDistributor_ = Teuchos::rcp (
new Distributor (comm_, out_));
528 reverseDistributor_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
529 reverseDistributor_->sendType_ = sendType_;
530 reverseDistributor_->barrierBetween_ = barrierBetween_;
531 reverseDistributor_->debug_ = debug_;
536 size_t totalSendLength =
537 std::accumulate (lengthsTo_.begin(), lengthsTo_.end(), 0);
542 size_t maxReceiveLength = 0;
543 const int myProcID = comm_->getRank();
544 for (
size_t i=0; i < numReceives_; ++i) {
545 if (procsFrom_[i] != myProcID) {
547 if (lengthsFrom_[i] > maxReceiveLength) {
548 maxReceiveLength = lengthsFrom_[i];
557 reverseDistributor_->selfMessage_ = selfMessage_;
558 reverseDistributor_->numSends_ = numReceives_;
559 reverseDistributor_->procsTo_ = procsFrom_;
560 reverseDistributor_->startsTo_ = startsFrom_;
561 reverseDistributor_->lengthsTo_ = lengthsFrom_;
562 reverseDistributor_->maxSendLength_ = maxReceiveLength;
563 reverseDistributor_->indicesTo_ = indicesFrom_;
564 reverseDistributor_->numReceives_ = numSends_;
565 reverseDistributor_->totalReceiveLength_ = totalSendLength;
566 reverseDistributor_->lengthsFrom_ = lengthsTo_;
567 reverseDistributor_->procsFrom_ = procsTo_;
568 reverseDistributor_->startsFrom_ = startsTo_;
569 reverseDistributor_->indicesFrom_ = indicesTo_;
578 reverseDistributor_->lastRoundBytesSend_ = 0;
579 reverseDistributor_->lastRoundBytesRecv_ = 0;
581 reverseDistributor_->useDistinctTags_ = useDistinctTags_;
594 reverseDistributor_->reverseDistributor_ = Teuchos::null;
599 using Teuchos::Array;
600 using Teuchos::CommRequest;
601 using Teuchos::FancyOStream;
602 using Teuchos::includesVerbLevel;
603 using Teuchos::is_null;
604 using Teuchos::OSTab;
606 using Teuchos::waitAll;
609 Teuchos::OSTab tab (out_);
611 #ifdef TPETRA_DISTRIBUTOR_TIMERS 612 Teuchos::TimeMonitor timeMon (*timer_doWaits_);
613 #endif // TPETRA_DISTRIBUTOR_TIMERS 615 const int myRank = comm_->getRank ();
618 std::ostringstream os;
619 os << myRank <<
": doWaits: # reqs = " 620 << requests_.size () << endl;
624 if (requests_.size() > 0) {
625 waitAll (*comm_, requests_());
627 #ifdef HAVE_TEUCHOS_DEBUG 629 for (Array<RCP<CommRequest<int> > >::const_iterator it = requests_.begin();
630 it != requests_.end(); ++it)
632 TEUCHOS_TEST_FOR_EXCEPTION( ! is_null (*it), std::runtime_error,
633 Teuchos::typeName(*
this) <<
"::doWaits(): Communication requests " 634 "should all be null aftr calling Teuchos::waitAll() on them, but " 635 "at least one request is not null.");
637 #endif // HAVE_TEUCHOS_DEBUG 640 requests_.resize (0);
643 #ifdef HAVE_TEUCHOS_DEBUG 645 const int localSizeNonzero = (requests_.size () != 0) ? 1 : 0;
646 int globalSizeNonzero = 0;
647 Teuchos::reduceAll<int, int> (*comm_, Teuchos::REDUCE_MAX,
649 Teuchos::outArg (globalSizeNonzero));
650 TEUCHOS_TEST_FOR_EXCEPTION(
651 globalSizeNonzero != 0, std::runtime_error,
652 "Tpetra::Distributor::doWaits: After waitAll, at least one process has " 653 "a nonzero number of outstanding posts. There should be none at this " 654 "point. Please report this bug to the Tpetra developers.");
656 #endif // HAVE_TEUCHOS_DEBUG 659 std::ostringstream os;
660 os << myRank <<
": doWaits done" << endl;
667 if (! reverseDistributor_.is_null()) {
668 reverseDistributor_->doWaits();
673 std::ostringstream out;
675 out <<
"\"Tpetra::Distributor\": {";
676 const std::string label = this->getObjectLabel ();
678 out <<
"Label: " << label <<
", ";
680 out <<
"How initialized: " 684 << DistributorSendTypeEnumToString (sendType_)
685 <<
", Barrier between receives and sends: " 686 << (barrierBetween_ ?
"true" :
"false")
687 <<
", Use distinct tags: " 688 << (useDistinctTags_ ?
"true" :
"false")
689 <<
", Debug: " << (debug_ ?
"true" :
"false")
696 localDescribeToString (
const Teuchos::EVerbosityLevel vl)
const 698 using Teuchos::toString;
699 using Teuchos::VERB_HIGH;
700 using Teuchos::VERB_EXTREME;
704 if (vl <= Teuchos::VERB_LOW || comm_.is_null ()) {
705 return std::string ();
708 auto outStringP = Teuchos::rcp (
new std::ostringstream ());
709 auto outp = Teuchos::getFancyOStream (outStringP);
710 Teuchos::FancyOStream& out = *outp;
712 const int myRank = comm_->getRank ();
713 const int numProcs = comm_->getSize ();
714 out <<
"Process " << myRank <<
" of " << numProcs <<
":" << endl;
715 Teuchos::OSTab tab1 (out);
719 if (vl == VERB_HIGH || vl == VERB_EXTREME) {
720 out <<
"procsTo: " << toString (procsTo_) << endl;
721 out <<
"lengthsTo: " << toString (lengthsTo_) << endl;
724 if (vl == VERB_EXTREME) {
725 out <<
"startsTo: " << toString (startsTo_) << endl;
726 out <<
"indicesTo: " << toString (indicesTo_) << endl;
728 if (vl == VERB_HIGH || vl == VERB_EXTREME) {
731 out <<
"lengthsFrom: " << toString (lengthsFrom_) << endl;
732 out <<
"startsFrom: " << toString (startsFrom_) << endl;
733 out <<
"procsFrom: " << toString (procsFrom_) << endl;
737 return outStringP->str ();
743 const Teuchos::EVerbosityLevel verbLevel)
const 746 using Teuchos::VERB_DEFAULT;
747 using Teuchos::VERB_NONE;
748 using Teuchos::VERB_LOW;
749 using Teuchos::VERB_MEDIUM;
750 using Teuchos::VERB_HIGH;
751 using Teuchos::VERB_EXTREME;
752 const Teuchos::EVerbosityLevel vl =
753 (verbLevel == VERB_DEFAULT) ? VERB_LOW : verbLevel;
755 if (vl == VERB_NONE) {
763 if (comm_.is_null ()) {
766 const int myRank = comm_->getRank ();
767 const int numProcs = comm_->getSize ();
776 Teuchos::RCP<Teuchos::OSTab> tab0, tab1;
782 tab0 = Teuchos::rcp (
new Teuchos::OSTab (out));
785 out <<
"\"Tpetra::Distributor\":" << endl;
786 tab1 = Teuchos::rcp (
new Teuchos::OSTab (out));
788 const std::string label = this->getObjectLabel ();
790 out <<
"Label: " << label << endl;
792 out <<
"Number of processes: " << numProcs << endl
793 <<
"How initialized: " 797 out <<
"Parameters: " << endl;
798 Teuchos::OSTab tab2 (out);
799 out <<
"\"Send type\": " 800 << DistributorSendTypeEnumToString (sendType_) << endl
801 <<
"\"Barrier between receives and sends\": " 802 << (barrierBetween_ ?
"true" :
"false") << endl
803 <<
"\"Use distinct tags\": " 804 << (useDistinctTags_ ?
"true" :
"false") << endl
805 <<
"\"Debug\": " << (debug_ ?
"true" :
"false") << endl;
811 const std::string lclStr = this->localDescribeToString (vl);
815 out <<
"Reverse Distributor:";
816 if (reverseDistributor_.is_null ()) {
817 out <<
" null" << endl;
821 reverseDistributor_->describe (out, vl);
826 Distributor::computeReceives ()
828 using Teuchos::Array;
829 using Teuchos::ArrayRCP;
831 using Teuchos::CommStatus;
832 using Teuchos::CommRequest;
833 using Teuchos::ireceive;
836 using Teuchos::REDUCE_SUM;
837 using Teuchos::receive;
838 using Teuchos::reduce;
839 using Teuchos::scatter;
841 using Teuchos::waitAll;
844 Teuchos::OSTab tab (out_);
845 const int myRank = comm_->getRank();
846 const int numProcs = comm_->getSize();
849 const int pathTag = 2;
850 const int tag = this->getTag (pathTag);
853 std::ostringstream os;
854 os << myRank <<
": computeReceives: " 855 "{selfMessage_: " << (selfMessage_ ?
"true" :
"false")
856 <<
", tag: " << tag <<
"}" << endl;
866 Array<int> toProcsFromMe (numProcs, 0);
867 #ifdef HAVE_TEUCHOS_DEBUG 868 bool counting_error =
false;
869 #endif // HAVE_TEUCHOS_DEBUG 870 for (
size_t i = 0; i < (numSends_ + (selfMessage_ ? 1 : 0)); ++i) {
871 #ifdef HAVE_TEUCHOS_DEBUG 872 if (toProcsFromMe[procsTo_[i]] != 0) {
873 counting_error =
true;
875 #endif // HAVE_TEUCHOS_DEBUG 876 toProcsFromMe[procsTo_[i]] = 1;
878 #ifdef HAVE_TEUCHOS_DEBUG 880 "Tpetra::Distributor::computeReceives: There was an error on at least " 881 "one process in counting the number of messages send by that process to " 882 "the other processs. Please report this bug to the Tpetra developers.",
884 #endif // HAVE_TEUCHOS_DEBUG 887 std::ostringstream os;
888 os << myRank <<
": computeReceives: Calling reduce and scatter" << endl;
945 Array<int> numRecvsOnEachProc;
946 if (myRank == root) {
947 numRecvsOnEachProc.resize (numProcs);
949 int numReceivesAsInt = 0;
950 reduce<int, int> (toProcsFromMe.getRawPtr (),
951 numRecvsOnEachProc.getRawPtr (),
952 numProcs, REDUCE_SUM, root, *comm_);
953 scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
954 &numReceivesAsInt, 1, root, *comm_);
955 numReceives_ =
static_cast<size_t> (numReceivesAsInt);
961 lengthsFrom_.assign (numReceives_, 0);
962 procsFrom_.assign (numReceives_, 0);
978 const size_t actualNumReceives = numReceives_ - (selfMessage_ ? 1 : 0);
984 Array<RCP<CommRequest<int> > > requests (actualNumReceives);
985 Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
986 Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
991 const int anySourceProc = MPI_ANY_SOURCE;
993 const int anySourceProc = -1;
997 std::ostringstream os;
998 os << myRank <<
": computeReceives: Posting " 999 << actualNumReceives <<
" irecvs" << endl;
1004 for (
size_t i = 0; i < actualNumReceives; ++i) {
1009 lengthsFromBuffers[i].resize (1);
1010 lengthsFromBuffers[i][0] = as<size_t> (0);
1011 requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc, tag, *comm_);
1013 std::ostringstream os;
1014 os << myRank <<
": computeReceives: " 1015 "Posted any-proc irecv w/ specified tag " << tag << endl;
1021 std::ostringstream os;
1022 os << myRank <<
": computeReceives: " 1023 "posting " << numSends_ <<
" sends" << endl;
1034 for (
size_t i = 0; i < numSends_ + (selfMessage_ ? 1 : 0); ++i) {
1035 if (procsTo_[i] != myRank) {
1039 const size_t*
const lengthsTo_i = &lengthsTo_[i];
1040 send<int, size_t> (lengthsTo_i, 1, as<int> (procsTo_[i]), tag, *comm_);
1042 std::ostringstream os;
1043 os << myRank <<
": computeReceives: " 1044 "Posted send to Proc " << procsTo_[i] <<
" w/ specified tag " 1056 lengthsFrom_[numReceives_-1] = lengthsTo_[i];
1057 procsFrom_[numReceives_-1] = myRank;
1062 std::ostringstream os;
1063 os << myRank <<
": computeReceives: waitAll on " 1064 << requests.size () <<
" requests" << endl;
1073 waitAll (*comm_, requests (), statuses ());
1074 for (
size_t i = 0; i < actualNumReceives; ++i) {
1075 lengthsFrom_[i] = *lengthsFromBuffers[i];
1076 procsFrom_[i] = statuses[i]->getSourceRank ();
1082 sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
1085 totalReceiveLength_ = std::accumulate (lengthsFrom_.begin(), lengthsFrom_.end(), 0);
1086 indicesFrom_.clear ();
1087 indicesFrom_.reserve (totalReceiveLength_);
1088 for (
size_t i = 0; i < totalReceiveLength_; ++i) {
1089 indicesFrom_.push_back(i);
1092 startsFrom_.clear ();
1093 startsFrom_.reserve (numReceives_);
1094 for (
size_t i = 0, j = 0; i < numReceives_; ++i) {
1095 startsFrom_.push_back(j);
1096 j += lengthsFrom_[i];
1104 std::ostringstream os;
1105 os << myRank <<
": computeReceives: done" << endl;
1113 using Teuchos::outArg;
1114 using Teuchos::REDUCE_MAX;
1115 using Teuchos::reduceAll;
1118 Teuchos::OSTab tab (out_);
1119 const size_t numExports = exportProcIDs.size();
1120 const int myProcID = comm_->getRank();
1121 const int numProcs = comm_->getSize();
1123 std::ostringstream os;
1124 os << myProcID <<
": createFromSends" << endl;
1176 Teuchos::Array<size_t> starts (numProcs + 1, 0);
1179 size_t numActive = 0;
1180 int needSendBuff = 0;
1182 #ifdef HAVE_TPETRA_DEBUG 1184 #endif // HAVE_TPETRA_DEBUG 1185 for (
size_t i = 0; i < numExports; ++i) {
1186 const int exportID = exportProcIDs[i];
1187 if (exportID >= numProcs) {
1188 #ifdef HAVE_TPETRA_DEBUG 1190 #endif // HAVE_TPETRA_DEBUG 1193 else if (exportID >= 0) {
1207 if (needSendBuff==0 && starts[exportID] > 1 && exportID != exportProcIDs[i-1]) {
1214 #ifdef HAVE_TPETRA_DEBUG 1221 reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
1222 TEUCHOS_TEST_FOR_EXCEPTION(gbl_badID >= 0, std::runtime_error,
1223 Teuchos::typeName(*
this) <<
"::createFromSends(): Process " << gbl_badID
1224 <<
", perhaps among other processes, got a bad send process ID.");
1239 #endif // HAVE_TPETRA_DEBUG 1241 #if defined(HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS) || defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS) 1243 int global_needSendBuff;
1244 reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
1245 outArg (global_needSendBuff));
1247 global_needSendBuff != 0, std::runtime_error,
1248 "::createFromSends: Grouping export IDs together by process rank often " 1249 "improves performance.");
1255 if (starts[myProcID] != 0) {
1256 selfMessage_ =
true;
1259 selfMessage_ =
false;
1262 #ifdef HAVE_TEUCHOS_DEBUG 1263 bool index_neq_numActive =
false;
1264 bool send_neq_numSends =
false;
1266 if (! needSendBuff) {
1271 for (
int i = 0; i < numProcs; ++i) {
1279 indicesTo_.resize(0);
1282 procsTo_.assign(numSends_,0);
1283 startsTo_.assign(numSends_,0);
1284 lengthsTo_.assign(numSends_,0);
1291 size_t index = 0, procIndex = 0;
1292 for (
size_t i = 0; i < numSends_; ++i) {
1293 while (exportProcIDs[procIndex] < 0) {
1296 startsTo_[i] = procIndex;
1297 int procID = exportProcIDs[procIndex];
1298 procsTo_[i] = procID;
1299 index += starts[procID];
1300 procIndex += starts[procID];
1302 #ifdef HAVE_TEUCHOS_DEBUG 1303 if (index != numActive) {
1304 index_neq_numActive =
true;
1310 if (numSends_ > 0) {
1311 sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1315 for (
size_t i = 0; i < numSends_; ++i) {
1316 int procID = procsTo_[i];
1317 lengthsTo_[i] = starts[procID];
1318 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1319 maxSendLength_ = lengthsTo_[i];
1330 if (starts[0] == 0 ) {
1336 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1338 i != starts.end(); ++i)
1340 if (*i != 0) ++numSends_;
1346 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1347 i=starts.rbegin()+1;
1348 i != starts.rend(); ++i)
1357 indicesTo_.resize(numActive);
1359 for (
size_t i = 0; i < numExports; ++i) {
1360 if (exportProcIDs[i] >= 0) {
1362 indicesTo_[starts[exportProcIDs[i]]] = i;
1364 ++starts[exportProcIDs[i]];
1376 for (
int proc = numProcs-1; proc != 0; --proc) {
1377 starts[proc] = starts[proc-1];
1380 starts[numProcs] = numActive;
1387 procsTo_.resize(numSends_);
1388 startsTo_.resize(numSends_);
1389 lengthsTo_.resize(numSends_);
1396 for (
int proc = 0; proc < numProcs; ++proc ) {
1397 if (starts[proc+1] != starts[proc]) {
1398 lengthsTo_[snd] = starts[proc+1] - starts[proc];
1399 startsTo_[snd] = starts[proc];
1401 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1402 maxSendLength_ = lengthsTo_[snd];
1404 procsTo_[snd] = proc;
1408 #ifdef HAVE_TEUCHOS_DEBUG 1409 if (snd != numSends_) {
1410 send_neq_numSends =
true;
1414 #ifdef HAVE_TEUCHOS_DEBUG 1416 "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1418 "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1421 if (selfMessage_) --numSends_;
1427 std::ostringstream os;
1428 os << myProcID <<
": createFromSends: done" << endl;
1434 howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
1436 return totalReceiveLength_;
1442 const Teuchos::ArrayView<const int>& remoteProcIDs)
1451 howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
1454 int myProcID = comm_->getRank ();
1455 int numProcs = comm_->getSize();
1457 const size_t numExportIDs = exportProcIDs.size();
1458 Teuchos::Array<size_t> starts (numProcs + 1, 0);
1460 size_t numActive = 0;
1461 int needSendBuff = 0;
1463 for(
size_t i = 0; i < numExportIDs; i++ )
1465 if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
1467 if( exportProcIDs[i] >= 0 )
1469 ++starts[ exportProcIDs[i] ];
1474 selfMessage_ = ( starts[myProcID] != 0 ) ? 1 : 0;
1480 if (starts[0] == 0 ) {
1486 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1488 i != starts.end(); ++i)
1490 if (*i != 0) ++numSends_;
1496 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1497 i=starts.rbegin()+1;
1498 i != starts.rend(); ++i)
1507 indicesTo_.resize(numActive);
1509 for (
size_t i = 0; i < numExportIDs; ++i) {
1510 if (exportProcIDs[i] >= 0) {
1512 indicesTo_[starts[exportProcIDs[i]]] = i;
1514 ++starts[exportProcIDs[i]];
1517 for (
int proc = numProcs-1; proc != 0; --proc) {
1518 starts[proc] = starts[proc-1];
1521 starts[numProcs] = numActive;
1522 procsTo_.resize(numSends_);
1523 startsTo_.resize(numSends_);
1524 lengthsTo_.resize(numSends_);
1527 for (
int proc = 0; proc < numProcs; ++proc ) {
1528 if (starts[proc+1] != starts[proc]) {
1529 lengthsTo_[snd] = starts[proc+1] - starts[proc];
1530 startsTo_[snd] = starts[proc];
1532 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1533 maxSendLength_ = lengthsTo_[snd];
1535 procsTo_[snd] = proc;
1545 for (
int i = 0; i < numProcs; ++i) {
1553 indicesTo_.resize(0);
1556 procsTo_.assign(numSends_,0);
1557 startsTo_.assign(numSends_,0);
1558 lengthsTo_.assign(numSends_,0);
1565 size_t index = 0, procIndex = 0;
1566 for (
size_t i = 0; i < numSends_; ++i) {
1567 while (exportProcIDs[procIndex] < 0) {
1570 startsTo_[i] = procIndex;
1571 int procID = exportProcIDs[procIndex];
1572 procsTo_[i] = procID;
1573 index += starts[procID];
1574 procIndex += starts[procID];
1579 if (numSends_ > 0) {
1580 sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1584 for (
size_t i = 0; i < numSends_; ++i) {
1585 int procID = procsTo_[i];
1586 lengthsTo_[i] = starts[procID];
1587 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1588 maxSendLength_ = lengthsTo_[i];
1594 numSends_ -= selfMessage_;
1595 std::vector<int> recv_list;
1596 recv_list.reserve(numSends_);
1599 for(
int i=0; i<remoteProcIDs.size(); i++) {
1600 if(remoteProcIDs[i]>last_pid) {
1601 recv_list.push_back(remoteProcIDs[i]);
1602 last_pid = remoteProcIDs[i];
1604 else if (remoteProcIDs[i]<last_pid)
1605 throw std::runtime_error(
"Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
1607 numReceives_ = recv_list.size();
1609 procsFrom_.assign(numReceives_,0);
1610 lengthsFrom_.assign(numReceives_,0);
1611 indicesFrom_.assign(numReceives_,0);
1612 startsFrom_.assign(numReceives_,0);
1614 for(
size_t i=0,j=0; i<numReceives_; ++i) {
1616 procsFrom_[i] = recv_list[i];
1618 for( ; j<(size_t)remoteProcIDs.size() &&
1619 remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
1620 lengthsFrom_[i] = j-jlast;
1622 totalReceiveLength_ = remoteProcIDs.size();
1623 indicesFrom_.clear ();
1624 indicesFrom_.reserve (totalReceiveLength_);
1625 for (
size_t i = 0; i < totalReceiveLength_; ++i) {
1626 indicesFrom_.push_back(i);
1629 numReceives_-=selfMessage_;
Namespace Tpetra contains the class and methods constituting the Tpetra library.
Teuchos::RCP< Distributor > getReverse() const
A reverse communication plan Distributor.
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
Teuchos::RCP< const Teuchos::ParameterList > getValidParameters() const
List of valid Distributor parameters.
size_t getTotalReceiveLength() const
Total number of values this process will receive from other processes.
size_t getMaxSendLength() const
Maximum number of values this process will send to another single process.
void swap(Distributor &rhs)
Swap the contents of rhs with those of *this.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
Teuchos::ArrayView< const size_t > getLengthsFrom() const
Number of values this process will receive from each process.
size_t getNumReceives() const
The number of processes from which we will receive data.
size_t getNumSends() const
The number of processes to which we will send data.
Implementation details of Tpetra.
size_t createFromSends(const Teuchos::ArrayView< const int > &exportProcIDs)
Set up Distributor using list of process ranks to which this process will send.
void gathervPrint(std::ostream &out, const std::string &s, const Teuchos::Comm< int > &comm)
On Process 0 in the given communicator, print strings from each process in that communicator, in rank order.
Teuchos::ArrayView< const size_t > getLengthsTo() const
Number of values this process will send to each process.
void describe(Teuchos::FancyOStream &out, const Teuchos::EVerbosityLevel verbLevel=Teuchos::Describable::verbLevel_default) const
Describe this object in a human-readable way to the given output stream.
void createFromSendsAndRecvs(const Teuchos::ArrayView< const int > &exportProcIDs, const Teuchos::ArrayView< const int > &remoteProcIDs)
Set up Distributor using list of process ranks to which to send, and list of process ranks from which...
Teuchos::ArrayView< const int > getProcsFrom() const
Ranks of the processes sending values to this process.
Sets up and executes a communication plan for a Tpetra DistObject.
void setParameterList(const Teuchos::RCP< Teuchos::ParameterList > &plist)
Set Distributor parameters.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, Exception, msg)
Print or throw an efficency warning.
Teuchos::ArrayView< const int > getProcsTo() const
Ranks of the processes to which this process will send values.
virtual ~Distributor()
Destructor (virtual for memory safety).
Teuchos::Array< std::string > distributorSendTypes()
Valid values for Distributor's "Send type" parameter.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2)
Sort the first array, and apply the resulting permutation to the second array.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
std::string description() const
Return a one-line description of this object.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
bool hasSelfMessage() const
Whether the calling process will send or receive messages to itself.
EDistributorSendType
The type of MPI send that Distributor should use.
Distributor(const Teuchos::RCP< const Teuchos::Comm< int > > &comm)
Construct using the specified communicator and default parameters.