39 #include "Tpetra_Details_DistributorPlan.hpp"
41 #include "Teuchos_StandardParameterEntryValidators.hpp"
51 if (sendType == DISTRIBUTOR_ISEND) {
54 else if (sendType == DISTRIBUTOR_RSEND) {
57 else if (sendType == DISTRIBUTOR_SEND) {
60 else if (sendType == DISTRIBUTOR_SSEND) {
64 TEUCHOS_TEST_FOR_EXCEPTION(
true, std::invalid_argument,
"Invalid "
65 "EDistributorSendType enum value " << sendType <<
".");
73 case Details::DISTRIBUTOR_NOT_INITIALIZED:
74 return "Not initialized yet";
75 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
76 return "By createFromSends";
77 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
78 return "By createFromRecvs";
79 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS:
80 return "By createFromSendsAndRecvs";
81 case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
82 return "By createReverseDistributor";
83 case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
84 return "By copy constructor";
90 DistributorPlan::DistributorPlan(Teuchos::RCP<
const Teuchos::Comm<int>> comm)
92 howInitialized_(DISTRIBUTOR_NOT_INITIALIZED),
93 reversePlan_(Teuchos::null),
94 sendType_(DISTRIBUTOR_SEND),
95 barrierBetweenRecvSend_(barrierBetween_default),
96 useDistinctTags_(useDistinctTags_default),
97 sendMessageToSelf_(false),
98 numSendsToOtherProcs_(0),
101 totalReceiveLength_(0)
104 DistributorPlan::DistributorPlan(
const DistributorPlan& otherPlan)
105 : comm_(otherPlan.comm_),
106 howInitialized_(DISTRIBUTOR_INITIALIZED_BY_COPY),
107 reversePlan_(otherPlan.reversePlan_),
108 sendType_(otherPlan.sendType_),
109 barrierBetweenRecvSend_(otherPlan.barrierBetweenRecvSend_),
110 useDistinctTags_(otherPlan.useDistinctTags_),
111 sendMessageToSelf_(otherPlan.sendMessageToSelf_),
112 numSendsToOtherProcs_(otherPlan.numSendsToOtherProcs_),
113 procIdsToSendTo_(otherPlan.procIdsToSendTo_),
114 startsTo_(otherPlan.startsTo_),
115 lengthsTo_(otherPlan.lengthsTo_),
116 maxSendLength_(otherPlan.maxSendLength_),
117 indicesTo_(otherPlan.indicesTo_),
118 numReceives_(otherPlan.numReceives_),
119 totalReceiveLength_(otherPlan.totalReceiveLength_),
120 lengthsFrom_(otherPlan.lengthsFrom_),
121 procsFrom_(otherPlan.procsFrom_),
122 startsFrom_(otherPlan.startsFrom_),
123 indicesFrom_(otherPlan.indicesFrom_)
126 int DistributorPlan::getTag(
const int pathTag)
const {
127 return useDistinctTags_ ? pathTag : comm_->getTag();
130 size_t DistributorPlan::createFromSends(
const Teuchos::ArrayView<const int>& exportProcIDs) {
131 using Teuchos::outArg;
132 using Teuchos::REDUCE_MAX;
133 using Teuchos::reduceAll;
136 const size_t numExports = exportProcIDs.size();
137 const int myProcID = comm_->getRank();
138 const int numProcs = comm_->getSize();
188 Teuchos::Array<size_t> starts (numProcs + 1, 0);
191 size_t numActive = 0;
192 int needSendBuff = 0;
194 for (
size_t i = 0; i < numExports; ++i) {
195 const int exportID = exportProcIDs[i];
210 if (needSendBuff == 0 && starts[exportID] > 1 &&
211 exportID != exportProcIDs[i-1]) {
218 #if defined(HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS) || defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
220 int global_needSendBuff;
221 reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
222 outArg (global_needSendBuff));
224 global_needSendBuff != 0, std::runtime_error,
225 "::createFromSends: Grouping export IDs together by process rank often "
226 "improves performance.");
232 if (starts[myProcID] != 0) {
233 sendMessageToSelf_ =
true;
236 sendMessageToSelf_ =
false;
239 if (! needSendBuff) {
241 numSendsToOtherProcs_ = 0;
244 for (
int i = 0; i < numProcs; ++i) {
246 ++numSendsToOtherProcs_;
252 indicesTo_.resize(0);
255 procIdsToSendTo_.assign(numSendsToOtherProcs_,0);
256 startsTo_.assign(numSendsToOtherProcs_,0);
257 lengthsTo_.assign(numSendsToOtherProcs_,0);
264 size_t index = 0, procIndex = 0;
265 for (
size_t i = 0; i < numSendsToOtherProcs_; ++i) {
266 while (exportProcIDs[procIndex] < 0) {
269 startsTo_[i] = procIndex;
270 int procID = exportProcIDs[procIndex];
271 procIdsToSendTo_[i] = procID;
272 index += starts[procID];
273 procIndex += starts[procID];
278 if (numSendsToOtherProcs_ > 0) {
279 sort2(procIdsToSendTo_.begin(), procIdsToSendTo_.end(), startsTo_.begin());
283 for (
size_t i = 0; i < numSendsToOtherProcs_; ++i) {
284 int procID = procIdsToSendTo_[i];
285 lengthsTo_[i] = starts[procID];
286 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
287 maxSendLength_ = lengthsTo_[i];
298 if (starts[0] == 0 ) {
299 numSendsToOtherProcs_ = 0;
302 numSendsToOtherProcs_ = 1;
304 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
306 i != starts.end(); ++i)
308 if (*i != 0) ++numSendsToOtherProcs_;
314 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
316 i != starts.rend(); ++i)
325 indicesTo_.resize(numActive);
327 for (
size_t i = 0; i < numExports; ++i) {
328 if (exportProcIDs[i] >= 0) {
330 indicesTo_[starts[exportProcIDs[i]]] = i;
332 ++starts[exportProcIDs[i]];
344 for (
int proc = numProcs-1; proc != 0; --proc) {
345 starts[proc] = starts[proc-1];
348 starts[numProcs] = numActive;
355 procIdsToSendTo_.resize(numSendsToOtherProcs_);
356 startsTo_.resize(numSendsToOtherProcs_);
357 lengthsTo_.resize(numSendsToOtherProcs_);
364 for (
int proc = 0; proc < numProcs; ++proc ) {
365 if (starts[proc+1] != starts[proc]) {
366 lengthsTo_[snd] = starts[proc+1] - starts[proc];
367 startsTo_[snd] = starts[proc];
369 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
370 maxSendLength_ = lengthsTo_[snd];
372 procIdsToSendTo_[snd] = proc;
378 if (sendMessageToSelf_) {
379 --numSendsToOtherProcs_;
387 howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
389 return totalReceiveLength_;
392 void DistributorPlan::createFromRecvs(
const Teuchos::ArrayView<const int>& remoteProcIDs)
394 createFromSends(remoteProcIDs);
396 *
this = *getReversePlan();
398 howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS;
401 void DistributorPlan::createFromSendsAndRecvs(
const Teuchos::ArrayView<const int>& exportProcIDs,
402 const Teuchos::ArrayView<const int>& remoteProcIDs)
411 howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
414 int myProcID = comm_->getRank ();
415 int numProcs = comm_->getSize();
417 const size_t numExportIDs = exportProcIDs.size();
418 Teuchos::Array<size_t> starts (numProcs + 1, 0);
420 size_t numActive = 0;
421 int needSendBuff = 0;
423 for(
size_t i = 0; i < numExportIDs; i++ )
425 if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
427 if( exportProcIDs[i] >= 0 )
429 ++starts[ exportProcIDs[i] ];
434 sendMessageToSelf_ = ( starts[myProcID] != 0 ) ? 1 : 0;
436 numSendsToOtherProcs_ = 0;
440 if (starts[0] == 0 ) {
441 numSendsToOtherProcs_ = 0;
444 numSendsToOtherProcs_ = 1;
446 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
448 i != starts.end(); ++i)
450 if (*i != 0) ++numSendsToOtherProcs_;
456 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
458 i != starts.rend(); ++i)
467 indicesTo_.resize(numActive);
469 for (
size_t i = 0; i < numExportIDs; ++i) {
470 if (exportProcIDs[i] >= 0) {
472 indicesTo_[starts[exportProcIDs[i]]] = i;
474 ++starts[exportProcIDs[i]];
477 for (
int proc = numProcs-1; proc != 0; --proc) {
478 starts[proc] = starts[proc-1];
481 starts[numProcs] = numActive;
482 procIdsToSendTo_.resize(numSendsToOtherProcs_);
483 startsTo_.resize(numSendsToOtherProcs_);
484 lengthsTo_.resize(numSendsToOtherProcs_);
487 for (
int proc = 0; proc < numProcs; ++proc ) {
488 if (starts[proc+1] != starts[proc]) {
489 lengthsTo_[snd] = starts[proc+1] - starts[proc];
490 startsTo_[snd] = starts[proc];
492 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
493 maxSendLength_ = lengthsTo_[snd];
495 procIdsToSendTo_[snd] = proc;
502 numSendsToOtherProcs_ = 0;
505 for (
int i = 0; i < numProcs; ++i) {
507 ++numSendsToOtherProcs_;
513 indicesTo_.resize(0);
516 procIdsToSendTo_.assign(numSendsToOtherProcs_,0);
517 startsTo_.assign(numSendsToOtherProcs_,0);
518 lengthsTo_.assign(numSendsToOtherProcs_,0);
525 size_t index = 0, procIndex = 0;
526 for (
size_t i = 0; i < numSendsToOtherProcs_; ++i) {
527 while (exportProcIDs[procIndex] < 0) {
530 startsTo_[i] = procIndex;
531 int procID = exportProcIDs[procIndex];
532 procIdsToSendTo_[i] = procID;
533 index += starts[procID];
534 procIndex += starts[procID];
539 if (numSendsToOtherProcs_ > 0) {
540 sort2(procIdsToSendTo_.begin(), procIdsToSendTo_.end(), startsTo_.begin());
544 for (
size_t i = 0; i < numSendsToOtherProcs_; ++i) {
545 int procID = procIdsToSendTo_[i];
546 lengthsTo_[i] = starts[procID];
547 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
548 maxSendLength_ = lengthsTo_[i];
554 numSendsToOtherProcs_ -= sendMessageToSelf_;
555 std::vector<int> recv_list;
556 recv_list.reserve(numSendsToOtherProcs_);
559 for(
int i=0; i<remoteProcIDs.size(); i++) {
560 if(remoteProcIDs[i]>last_pid) {
561 recv_list.push_back(remoteProcIDs[i]);
562 last_pid = remoteProcIDs[i];
564 else if (remoteProcIDs[i]<last_pid)
565 throw std::runtime_error(
"Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
567 numReceives_ = recv_list.size();
569 procsFrom_.assign(numReceives_,0);
570 lengthsFrom_.assign(numReceives_,0);
571 indicesFrom_.assign(numReceives_,0);
572 startsFrom_.assign(numReceives_,0);
574 for(
size_t i=0,j=0; i<numReceives_; ++i) {
576 procsFrom_[i] = recv_list[i];
578 for( ; j<(size_t)remoteProcIDs.size() &&
579 remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
580 lengthsFrom_[i] = j-jlast;
582 totalReceiveLength_ = remoteProcIDs.size();
583 indicesFrom_.clear ();
584 numReceives_-=sendMessageToSelf_;
587 Teuchos::RCP<DistributorPlan> DistributorPlan::getReversePlan()
const {
588 if (reversePlan_.is_null()) createReversePlan();
592 void DistributorPlan::createReversePlan()
const
594 reversePlan_ = Teuchos::rcp(
new DistributorPlan(comm_));
595 reversePlan_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
596 reversePlan_->sendType_ = sendType_;
597 reversePlan_->barrierBetweenRecvSend_ = barrierBetweenRecvSend_;
602 size_t totalSendLength =
603 std::accumulate(lengthsTo_.begin(), lengthsTo_.end(), 0);
608 size_t maxReceiveLength = 0;
609 const int myProcID = comm_->getRank();
610 for (
size_t i=0; i < numReceives_; ++i) {
611 if (procsFrom_[i] != myProcID) {
613 if (lengthsFrom_[i] > maxReceiveLength) {
614 maxReceiveLength = lengthsFrom_[i];
619 reversePlan_->sendMessageToSelf_ = sendMessageToSelf_;
620 reversePlan_->numSendsToOtherProcs_ = numReceives_;
621 reversePlan_->procIdsToSendTo_ = procsFrom_;
622 reversePlan_->startsTo_ = startsFrom_;
623 reversePlan_->lengthsTo_ = lengthsFrom_;
624 reversePlan_->maxSendLength_ = maxReceiveLength;
625 reversePlan_->indicesTo_ = indicesFrom_;
626 reversePlan_->numReceives_ = numSendsToOtherProcs_;
627 reversePlan_->totalReceiveLength_ = totalSendLength;
628 reversePlan_->lengthsFrom_ = lengthsTo_;
629 reversePlan_->procsFrom_ = procIdsToSendTo_;
630 reversePlan_->startsFrom_ = startsTo_;
631 reversePlan_->indicesFrom_ = indicesTo_;
632 reversePlan_->useDistinctTags_ = useDistinctTags_;
635 void DistributorPlan::computeReceives()
637 using Teuchos::Array;
638 using Teuchos::ArrayRCP;
640 using Teuchos::CommStatus;
641 using Teuchos::CommRequest;
642 using Teuchos::ireceive;
645 using Teuchos::REDUCE_SUM;
646 using Teuchos::receive;
647 using Teuchos::reduce;
648 using Teuchos::scatter;
650 using Teuchos::waitAll;
652 const int myRank = comm_->getRank();
653 const int numProcs = comm_->getSize();
656 const int pathTag = 2;
657 const int tag = getTag(pathTag);
665 Array<int> toProcsFromMe (numProcs, 0);
666 #ifdef HAVE_TEUCHOS_DEBUG
667 bool counting_error =
false;
669 for (
size_t i = 0; i < (numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0)); ++i) {
670 #ifdef HAVE_TEUCHOS_DEBUG
671 if (toProcsFromMe[procIdsToSendTo_[i]] != 0) {
672 counting_error =
true;
675 toProcsFromMe[procIdsToSendTo_[i]] = 1;
677 #ifdef HAVE_TEUCHOS_DEBUG
679 "Tpetra::Distributor::computeReceives: There was an error on at least "
680 "one process in counting the number of messages send by that process to "
681 "the other processs. Please report this bug to the Tpetra developers.",
738 Array<int> numRecvsOnEachProc;
739 if (myRank == root) {
740 numRecvsOnEachProc.resize (numProcs);
742 int numReceivesAsInt = 0;
743 reduce<int, int> (toProcsFromMe.getRawPtr (),
744 numRecvsOnEachProc.getRawPtr (),
745 numProcs, REDUCE_SUM, root, *comm_);
746 scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
747 &numReceivesAsInt, 1, root, *comm_);
748 numReceives_ =
static_cast<size_t> (numReceivesAsInt);
754 lengthsFrom_.assign (numReceives_, 0);
755 procsFrom_.assign (numReceives_, 0);
771 const size_t actualNumReceives = numReceives_ - (sendMessageToSelf_ ? 1 : 0);
777 Array<RCP<CommRequest<int> > > requests (actualNumReceives);
778 Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
779 Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
784 const int anySourceProc = MPI_ANY_SOURCE;
786 const int anySourceProc = -1;
790 for (
size_t i = 0; i < actualNumReceives; ++i) {
795 lengthsFromBuffers[i].resize (1);
796 lengthsFromBuffers[i][0] = as<size_t> (0);
797 requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc,
809 for (
size_t i = 0; i < numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0); ++i) {
810 if (procIdsToSendTo_[i] != myRank) {
814 const size_t*
const lengthsTo_i = &lengthsTo_[i];
815 send<int, size_t> (lengthsTo_i, 1, as<int> (procIdsToSendTo_[i]), tag, *comm_);
824 lengthsFrom_[numReceives_-1] = lengthsTo_[i];
825 procsFrom_[numReceives_-1] = myRank;
835 waitAll (*comm_, requests (), statuses ());
836 for (
size_t i = 0; i < actualNumReceives; ++i) {
837 lengthsFrom_[i] = *lengthsFromBuffers[i];
838 procsFrom_[i] = statuses[i]->getSourceRank ();
844 sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
847 totalReceiveLength_ =
848 std::accumulate (lengthsFrom_.begin (), lengthsFrom_.end (), 0);
849 indicesFrom_.clear ();
851 startsFrom_.clear ();
852 startsFrom_.reserve (numReceives_);
853 for (
size_t i = 0, j = 0; i < numReceives_; ++i) {
854 startsFrom_.push_back(j);
855 j += lengthsFrom_[i];
858 if (sendMessageToSelf_) {
863 void DistributorPlan::setParameterList(
const Teuchos::RCP<Teuchos::ParameterList>& plist)
865 using Teuchos::FancyOStream;
866 using Teuchos::getIntegralValue;
867 using Teuchos::ParameterList;
868 using Teuchos::parameterList;
872 if (! plist.is_null()) {
873 RCP<const ParameterList> validParams = getValidParameters ();
874 plist->validateParametersAndSetDefaults (*validParams);
876 const bool barrierBetween =
877 plist->get<
bool> (
"Barrier between receives and sends");
879 getIntegralValue<Details::EDistributorSendType> (*plist,
"Send type");
880 const bool useDistinctTags = plist->get<
bool> (
"Use distinct tags");
885 const bool enable_cuda_rdma =
886 plist->get<
bool> (
"Enable MPI CUDA RDMA support");
887 TEUCHOS_TEST_FOR_EXCEPTION
888 (! enable_cuda_rdma, std::invalid_argument,
"Tpetra::Distributor::"
889 "setParameterList: " <<
"You specified \"Enable MPI CUDA RDMA "
890 "support\" = false. This is no longer valid. You don't need to "
891 "specify this option any more; Tpetra assumes it is always true. "
892 "This is a very light assumption on the MPI implementation, and in "
893 "fact does not actually involve hardware or system RDMA support. "
894 "Tpetra just assumes that the MPI implementation can tell whether a "
895 "pointer points to host memory or CUDA device memory.");
902 TEUCHOS_TEST_FOR_EXCEPTION
903 (! barrierBetween && sendType == Details::DISTRIBUTOR_RSEND,
904 std::invalid_argument,
"Tpetra::Distributor::setParameterList: " << endl
905 <<
"You specified \"Send type\"=\"Rsend\", but turned off the barrier "
906 "between receives and sends." << endl <<
"This is invalid; you must "
907 "include the barrier if you use ready sends." << endl <<
"Ready sends "
908 "require that their corresponding receives have already been posted, "
909 "and the only way to guarantee that in general is with a barrier.");
912 sendType_ = sendType;
913 barrierBetweenRecvSend_ = barrierBetween;
914 useDistinctTags_ = useDistinctTags;
918 this->setMyParamList (plist);
924 Teuchos::Array<std::string> sendTypes;
925 sendTypes.push_back (
"Isend");
926 sendTypes.push_back (
"Rsend");
927 sendTypes.push_back (
"Send");
928 sendTypes.push_back (
"Ssend");
932 Teuchos::RCP<const Teuchos::ParameterList>
933 DistributorPlan::getValidParameters()
const
935 using Teuchos::Array;
936 using Teuchos::ParameterList;
937 using Teuchos::parameterList;
939 using Teuchos::setStringToIntegralParameter;
941 const bool barrierBetween = Details::barrierBetween_default;
942 const bool useDistinctTags = Details::useDistinctTags_default;
945 const std::string defaultSendType (
"Send");
946 Array<Details::EDistributorSendType> sendTypeEnums;
947 sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
948 sendTypeEnums.push_back (Details::DISTRIBUTOR_RSEND);
949 sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
950 sendTypeEnums.push_back (Details::DISTRIBUTOR_SSEND);
952 RCP<ParameterList> plist = parameterList (
"Tpetra::Distributor");
953 plist->set (
"Barrier between receives and sends", barrierBetween,
954 "Whether to execute a barrier between receives and sends in do"
955 "[Reverse]Posts(). Required for correctness when \"Send type\""
956 "=\"Rsend\", otherwise correct but not recommended.");
957 setStringToIntegralParameter<Details::EDistributorSendType> (
"Send type",
958 defaultSendType,
"When using MPI, the variant of send to use in "
959 "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
960 plist->set (
"Use distinct tags", useDistinctTags,
"Whether to use distinct "
961 "MPI message tags for different code paths. Highly recommended"
962 " to avoid message collisions.");
963 plist->set (
"Timer Label",
"",
"Label for Time Monitor output");
964 plist->set (
"Enable MPI CUDA RDMA support",
true,
"Assume that MPI can "
965 "tell whether a pointer points to host memory or CUDA device "
966 "memory. You don't need to specify this option any more; "
967 "Tpetra assumes it is always true. This is a very light "
968 "assumption on the MPI implementation, and in fact does not "
969 "actually involve hardware or system RDMA support.");
971 return Teuchos::rcp_const_cast<const ParameterList> (plist);
Stand-alone utility functions and macros.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, Exception, msg)
Print or throw an efficency warning.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
Implementation details of Tpetra.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
EDistributorSendType
The type of MPI send that Distributor should use.
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
Namespace Tpetra contains the class and methods constituting the Tpetra library.
Teuchos::Array< std::string > distributorSendTypes()
Valid values for Distributor's "Send type" parameter.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2)
Sort the first array, and apply the resulting permutation to the second array.