Tpetra parallel linear algebra  Version of the Day
Tpetra_Details_DistributorActor.hpp
1 // @HEADER
2 // ***********************************************************************
3 //
4 // Tpetra: Templated Linear Algebra Services Package
5 // Copyright (2008) Sandia Corporation
6 //
7 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
8 // the U.S. Government retains certain rights in this software.
9 //
10 // Redistribution and use in source and binary forms, with or without
11 // modification, are permitted provided that the following conditions are
12 // met:
13 //
14 // 1. Redistributions of source code must retain the above copyright
15 // notice, this list of conditions and the following disclaimer.
16 //
17 // 2. Redistributions in binary form must reproduce the above copyright
18 // notice, this list of conditions and the following disclaimer in the
19 // documentation and/or other materials provided with the distribution.
20 //
21 // 3. Neither the name of the Corporation nor the names of the
22 // contributors may be used to endorse or promote products derived from
23 // this software without specific prior written permission.
24 //
25 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
26 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
28 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
29 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
30 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
31 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
32 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
33 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
34 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
35 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 //
37 // ************************************************************************
38 // @HEADER
39 
40 #ifndef TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
41 #define TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
42 
43 #include "Tpetra_Details_DistributorPlan.hpp"
44 #include "Tpetra_Util.hpp"
45 
46 #include "Teuchos_Array.hpp"
47 #include "Teuchos_Comm.hpp"
48 #include "Teuchos_RCP.hpp"
49 #include "Teuchos_Time.hpp"
50 
51 namespace Tpetra {
52 namespace Details {
53 
54 template <class View1, class View2>
55 constexpr bool areKokkosViews = Kokkos::Impl::is_view<View1>::value && Kokkos::Impl::is_view<View2>::value;
56 
57 class DistributorActor {
58 
59 public:
60  DistributorActor();
61  DistributorActor(const DistributorActor& otherActor);
62 
63  template <class ExpView, class ImpView>
64  void doPostsAndWaits(const DistributorPlan& plan,
65  const ExpView &exports,
66  size_t numPackets,
67  const ImpView &imports);
68 
69  template <class ExpView, class ImpView>
70  void doPostsAndWaits(const DistributorPlan& plan,
71  const ExpView &exports,
72  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
73  const ImpView &imports,
74  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
75 
76  template <class ExpView, class ImpView>
77  void doPosts(const DistributorPlan& plan,
78  const ExpView& exports,
79  size_t numPackets,
80  const ImpView& imports);
81 
82  template <class ExpView, class ImpView>
83  void doPosts(const DistributorPlan& plan,
84  const ExpView &exports,
85  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
86  const ImpView &imports,
87  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
88 
89  void doWaits(const DistributorPlan& plan);
90 
91 private:
92  Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requests_;
93 
94 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
95  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_;
96  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_;
97  Teuchos::RCP<Teuchos::Time> timer_doWaits_;
98  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_recvs_;
99  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_recvs_;
100  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_barrier_;
101  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_barrier_;
102  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_;
103  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_;
104  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_slow_;
105  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_slow_;
106  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_fast_;
107  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_fast_;
108 
110  void makeTimers();
111 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
112 };
113 
114 template <class ExpView, class ImpView>
115 void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
116  const ExpView& exports,
117  size_t numPackets,
118  const ImpView& imports)
119 {
120  static_assert(areKokkosViews<ExpView, ImpView>,
121  "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
122  doPosts(plan, exports, numPackets, imports);
123  doWaits(plan);
124 }
125 
126 template <class ExpView, class ImpView>
127 void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
128  const ExpView& exports,
129  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
130  const ImpView& imports,
131  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID)
132 {
133  static_assert(areKokkosViews<ExpView, ImpView>,
134  "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
135  doPosts(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
136  doWaits(plan);
137 }
138 
139 template <class ExpView, class ImpView>
140 void DistributorActor::doPosts(const DistributorPlan& plan,
141  const ExpView& exports,
142  size_t numPackets,
143  const ImpView& imports)
144 {
145  static_assert(areKokkosViews<ExpView, ImpView>,
146  "Data arrays for DistributorActor::doPosts must be Kokkos::Views");
147  using Teuchos::Array;
148  using Teuchos::as;
149  using Teuchos::FancyOStream;
150  using Teuchos::includesVerbLevel;
151  using Teuchos::ireceive;
152  using Teuchos::isend;
153  using Teuchos::readySend;
154  using Teuchos::send;
155  using Teuchos::ssend;
156  using Teuchos::TypeNameTraits;
157  using Teuchos::typeName;
158  using std::endl;
159  using Kokkos::Compat::create_const_view;
160  using Kokkos::Compat::create_view;
161  using Kokkos::Compat::subview_offset;
162  using Kokkos::Compat::deep_copy_offset;
163  typedef Array<size_t>::size_type size_type;
164  typedef ExpView exports_view_type;
165  typedef ImpView imports_view_type;
166 
167 #ifdef KOKKOS_ENABLE_CUDA
168  static_assert
169  (! std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
170  ! std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
171  "Please do not use Tpetra::Distributor with UVM allocations. "
172  "See Trilinos GitHub issue #1088.");
173 #endif // KOKKOS_ENABLE_CUDA
174 
175 #ifdef KOKKOS_ENABLE_SYCL
176  static_assert
177  (! std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
178  ! std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
179  "Please do not use Tpetra::Distributor with SharedUSM allocations. "
180  "See Trilinos GitHub issue #1088 (corresponding to CUDA).");
181 #endif // KOKKOS_ENABLE_SYCL
182 
183 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
184  Teuchos::TimeMonitor timeMon (*timer_doPosts3KV_);
185 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
186 
187  const int myRank = plan.getComm()->getRank ();
188  // Run-time configurable parameters that come from the input
189  // ParameterList set by setParameterList().
190  const Details::EDistributorSendType sendType = plan.getSendType();
191  const bool doBarrier = plan.barrierBetweenRecvSend();
192 
193  TEUCHOS_TEST_FOR_EXCEPTION(
194  sendType == Details::DISTRIBUTOR_RSEND && ! doBarrier,
195  std::logic_error,
196  "Tpetra::Distributor::doPosts(3 args, Kokkos): Ready-send version "
197  "requires a barrier between posting receives and posting ready sends. "
198  "This should have been checked before. "
199  "Please report this bug to the Tpetra developers.");
200 
201  size_t selfReceiveOffset = 0;
202 
203  // MPI tag for nonblocking receives and blocking sends in this
204  // method. Some processes might take the "fast" path
205  // (getIndicesTo().is_null()) and others might take the "slow" path for
206  // the same doPosts() call, so the path tag must be the same for
207  // both.
208  const int pathTag = 0;
209  const int tag = plan.getTag(pathTag);
210 
211 #ifdef HAVE_TPETRA_DEBUG
212  TEUCHOS_TEST_FOR_EXCEPTION
213  (requests_.size () != 0,
214  std::logic_error,
215  "Tpetra::Distributor::doPosts(3 args, Kokkos): Process "
216  << myRank << ": requests_.size() = " << requests_.size () << " != 0.");
217 #endif // HAVE_TPETRA_DEBUG
218 
219  // Distributor uses requests_.size() as the number of outstanding
220  // nonblocking message requests, so we resize to zero to maintain
221  // this invariant.
222  //
223  // getNumReceives() does _not_ include the self message, if there is
224  // one. Here, we do actually send a message to ourselves, so we
225  // include any self message in the "actual" number of receives to
226  // post.
227  //
228  // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
229  // doesn't (re)allocate its array of requests. That happens in
230  // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
231  // demand), or Resize_().
232  const size_type actualNumReceives = as<size_type> (plan.getNumReceives()) +
233  as<size_type> (plan.hasSelfMessage() ? 1 : 0);
234  requests_.resize (0);
235 
236  // Post the nonblocking receives. It's common MPI wisdom to post
237  // receives before sends. In MPI terms, this means favoring
238  // adding to the "posted queue" (of receive requests) over adding
239  // to the "unexpected queue" (of arrived messages not yet matched
240  // with a receive).
241  {
242 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
243  Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts3KV_recvs_);
244 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
245 
246  size_t curBufferOffset = 0;
247  for (size_type i = 0; i < actualNumReceives; ++i) {
248  const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
249  if (plan.getProcsFrom()[i] != myRank) {
250  // If my process is receiving these packet(s) from another
251  // process (not a self-receive):
252  //
253  // 1. Set up the persisting view (recvBuf) of the imports
254  // array, given the offset and size (total number of
255  // packets from process getProcsFrom()[i]).
256  // 2. Start the Irecv and save the resulting request.
257  TEUCHOS_TEST_FOR_EXCEPTION(
258  curBufferOffset + curBufLen > static_cast<size_t> (imports.size ()),
259  std::logic_error, "Tpetra::Distributor::doPosts(3 args, Kokkos): "
260  "Exceeded size of 'imports' array in packing loop on Process " <<
261  myRank << ". imports.size() = " << imports.size () << " < "
262  "curBufferOffset(" << curBufferOffset << ") + curBufLen(" <<
263  curBufLen << ").");
264  imports_view_type recvBuf =
265  subview_offset (imports, curBufferOffset, curBufLen);
266  requests_.push_back (ireceive<int> (recvBuf, plan.getProcsFrom()[i],
267  tag, *plan.getComm()));
268  }
269  else { // Receiving from myself
270  selfReceiveOffset = curBufferOffset; // Remember the self-recv offset
271  }
272  curBufferOffset += curBufLen;
273  }
274  }
275 
276  if (doBarrier) {
277 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
278  Teuchos::TimeMonitor timeMonBarrier (*timer_doPosts3KV_barrier_);
279 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
280 
281  // If we are using ready sends (MPI_Rsend) below, we need to do
282  // a barrier before we post the ready sends. This is because a
283  // ready send requires that its matching receive has already
284  // been posted before the send has been posted. The only way to
285  // guarantee that in this case is to use a barrier.
286  plan.getComm()->barrier ();
287  }
288 
289 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
290  Teuchos::TimeMonitor timeMonSends (*timer_doPosts3KV_sends_);
291 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
292 
293  // setup scan through getProcsTo() list starting with higher numbered procs
294  // (should help balance message traffic)
295  //
296  // FIXME (mfh 20 Feb 2013) Why haven't we precomputed this?
297  // It doesn't depend on the input at all.
298  size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
299  size_t procIndex = 0;
300  while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myRank)) {
301  ++procIndex;
302  }
303  if (procIndex == numBlocks) {
304  procIndex = 0;
305  }
306 
307  size_t selfNum = 0;
308  size_t selfIndex = 0;
309 
310  if (plan.getIndicesTo().is_null()) {
311 
312 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
313  Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts3KV_sends_fast_);
314 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
315 
316  // Data are already blocked (laid out) by process, so we don't
317  // need a separate send buffer (besides the exports array).
318  for (size_t i = 0; i < numBlocks; ++i) {
319  size_t p = i + procIndex;
320  if (p > (numBlocks - 1)) {
321  p -= numBlocks;
322  }
323 
324  if (plan.getProcsTo()[p] != myRank) {
325  exports_view_type tmpSend = subview_offset(
326  exports, plan.getStartsTo()[p]*numPackets, plan.getLengthsTo()[p]*numPackets);
327 
328  if (sendType == Details::DISTRIBUTOR_SEND) {
329  send<int> (tmpSend,
330  as<int> (tmpSend.size ()),
331  plan.getProcsTo()[p], tag, *plan.getComm());
332  }
333  else if (sendType == Details::DISTRIBUTOR_ISEND) {
334  exports_view_type tmpSendBuf =
335  subview_offset (exports, plan.getStartsTo()[p] * numPackets,
336  plan.getLengthsTo()[p] * numPackets);
337  requests_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
338  tag, *plan.getComm()));
339  }
340  else if (sendType == Details::DISTRIBUTOR_RSEND) {
341  readySend<int> (tmpSend,
342  as<int> (tmpSend.size ()),
343  plan.getProcsTo()[p], tag, *plan.getComm());
344  }
345  else if (sendType == Details::DISTRIBUTOR_SSEND) {
346  ssend<int> (tmpSend,
347  as<int> (tmpSend.size ()),
348  plan.getProcsTo()[p], tag, *plan.getComm());
349  } else {
350  TEUCHOS_TEST_FOR_EXCEPTION(
351  true,
352  std::logic_error,
353  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
354  "Invalid send type. We should never get here. "
355  "Please report this bug to the Tpetra developers.");
356  }
357  }
358  else { // "Sending" the message to myself
359  selfNum = p;
360  }
361  }
362 
363  if (plan.hasSelfMessage()) {
364  // This is how we "send a message to ourself": we copy from
365  // the export buffer to the import buffer. That saves
366  // Teuchos::Comm implementations other than MpiComm (in
367  // particular, SerialComm) the trouble of implementing self
368  // messages correctly. (To do this right, SerialComm would
369  // need internal buffer space for messages, keyed on the
370  // message's tag.)
371  deep_copy_offset(imports, exports, selfReceiveOffset,
372  plan.getStartsTo()[selfNum]*numPackets,
373  plan.getLengthsTo()[selfNum]*numPackets);
374  }
375  }
376  else { // data are not blocked by proc, use send buffer
377 
378 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
379  Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts3KV_sends_slow_);
380 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
381 
382  typedef typename ExpView::non_const_value_type Packet;
383  typedef typename ExpView::array_layout Layout;
384  typedef typename ExpView::device_type Device;
385  typedef typename ExpView::memory_traits Mem;
386  Kokkos::View<Packet*,Layout,Device,Mem> sendArray ("sendArray",
387  plan.getMaxSendLength() * numPackets);
388 
389  // FIXME (mfh 05 Mar 2013) This is broken for Isend (nonblocking
390  // sends), because the buffer is only long enough for one send.
391  TEUCHOS_TEST_FOR_EXCEPTION(
392  sendType == Details::DISTRIBUTOR_ISEND,
393  std::logic_error,
394  "Tpetra::Distributor::doPosts(3 args, Kokkos): The \"send buffer\" code path "
395  "doesn't currently work with nonblocking sends.");
396 
397  for (size_t i = 0; i < numBlocks; ++i) {
398  size_t p = i + procIndex;
399  if (p > (numBlocks - 1)) {
400  p -= numBlocks;
401  }
402 
403  if (plan.getProcsTo()[p] != myRank) {
404  size_t sendArrayOffset = 0;
405  size_t j = plan.getStartsTo()[p];
406  for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
407  deep_copy_offset(sendArray, exports, sendArrayOffset,
408  plan.getIndicesTo()[j]*numPackets, numPackets);
409  sendArrayOffset += numPackets;
410  }
411  ImpView tmpSend =
412  subview_offset(sendArray, size_t(0), plan.getLengthsTo()[p]*numPackets);
413 
414  if (sendType == Details::DISTRIBUTOR_SEND) {
415  send<int> (tmpSend,
416  as<int> (tmpSend.size ()),
417  plan.getProcsTo()[p], tag, *plan.getComm());
418  }
419  else if (sendType == Details::DISTRIBUTOR_ISEND) {
420  exports_view_type tmpSendBuf =
421  subview_offset (sendArray, size_t(0), plan.getLengthsTo()[p] * numPackets);
422  requests_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
423  tag, *plan.getComm()));
424  }
425  else if (sendType == Details::DISTRIBUTOR_RSEND) {
426  readySend<int> (tmpSend,
427  as<int> (tmpSend.size ()),
428  plan.getProcsTo()[p], tag, *plan.getComm());
429  }
430  else if (sendType == Details::DISTRIBUTOR_SSEND) {
431  ssend<int> (tmpSend,
432  as<int> (tmpSend.size ()),
433  plan.getProcsTo()[p], tag, *plan.getComm());
434  }
435  else {
436  TEUCHOS_TEST_FOR_EXCEPTION(
437  true,
438  std::logic_error,
439  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
440  "Invalid send type. We should never get here. "
441  "Please report this bug to the Tpetra developers.");
442  }
443  }
444  else { // "Sending" the message to myself
445  selfNum = p;
446  selfIndex = plan.getStartsTo()[p];
447  }
448  }
449 
450  if (plan.hasSelfMessage()) {
451  for (size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
452  deep_copy_offset(imports, exports, selfReceiveOffset,
453  plan.getIndicesTo()[selfIndex]*numPackets, numPackets);
454  ++selfIndex;
455  selfReceiveOffset += numPackets;
456  }
457  }
458  }
459 }
460 
461 template <class ExpView, class ImpView>
462 void DistributorActor::doPosts(const DistributorPlan& plan,
463  const ExpView &exports,
464  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
465  const ImpView &imports,
466  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID)
467 {
468  static_assert(areKokkosViews<ExpView, ImpView>,
469  "Data arrays for DistributorActor::doPosts must be Kokkos::Views");
470  using Teuchos::Array;
471  using Teuchos::as;
472  using Teuchos::ireceive;
473  using Teuchos::isend;
474  using Teuchos::readySend;
475  using Teuchos::send;
476  using Teuchos::ssend;
477  using Teuchos::TypeNameTraits;
478  using std::endl;
479  using Kokkos::Compat::create_const_view;
480  using Kokkos::Compat::create_view;
481  using Kokkos::Compat::subview_offset;
482  using Kokkos::Compat::deep_copy_offset;
483  typedef Array<size_t>::size_type size_type;
484  typedef ExpView exports_view_type;
485  typedef ImpView imports_view_type;
486 
487 #ifdef KOKKOS_ENABLE_CUDA
488  static_assert (! std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
489  ! std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
490  "Please do not use Tpetra::Distributor with UVM "
491  "allocations. See GitHub issue #1088.");
492 #endif // KOKKOS_ENABLE_CUDA
493 
494 #ifdef KOKKOS_ENABLE_SYCL
495  static_assert (! std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
496  ! std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
497  "Please do not use Tpetra::Distributor with SharedUSM "
498  "allocations. See GitHub issue #1088 (corresponding to CUDA).");
499 #endif // KOKKOS_ENABLE_SYCL
500 
501 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
502  Teuchos::TimeMonitor timeMon (*timer_doPosts4KV_);
503 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
504 
505  // Run-time configurable parameters that come from the input
506  // ParameterList set by setParameterList().
507  const Details::EDistributorSendType sendType = plan.getSendType();
508  const bool doBarrier = plan.barrierBetweenRecvSend();
509 
510  TEUCHOS_TEST_FOR_EXCEPTION(
511  sendType == Details::DISTRIBUTOR_RSEND && ! doBarrier,
512  std::logic_error, "Tpetra::Distributor::doPosts(4 args, Kokkos): Ready-send "
513  "version requires a barrier between posting receives and posting ready "
514  "sends. This should have been checked before. "
515  "Please report this bug to the Tpetra developers.");
516 
517  const int myProcID = plan.getComm()->getRank ();
518  size_t selfReceiveOffset = 0;
519 
520 #ifdef HAVE_TEUCHOS_DEBUG
521  // Different messages may have different numbers of packets.
522  size_t totalNumImportPackets = 0;
523  for (size_type ii = 0; ii < numImportPacketsPerLID.size (); ++ii) {
524  totalNumImportPackets += numImportPacketsPerLID[ii];
525  }
526  TEUCHOS_TEST_FOR_EXCEPTION(
527  imports.extent (0) < totalNumImportPackets, std::runtime_error,
528  "Tpetra::Distributor::doPosts(4 args, Kokkos): The 'imports' array must have "
529  "enough entries to hold the expected number of import packets. "
530  "imports.extent(0) = " << imports.extent (0) << " < "
531  "totalNumImportPackets = " << totalNumImportPackets << ".");
532 #endif // HAVE_TEUCHOS_DEBUG
533 
534  // MPI tag for nonblocking receives and blocking sends in this
535  // method. Some processes might take the "fast" path
536  // (plan.getIndicesTo().is_null()) and others might take the "slow" path for
537  // the same doPosts() call, so the path tag must be the same for
538  // both.
539  const int pathTag = 1;
540  const int tag = plan.getTag(pathTag);
541 
542 #ifdef HAVE_TEUCHOS_DEBUG
543  TEUCHOS_TEST_FOR_EXCEPTION
544  (requests_.size () != 0, std::logic_error, "Tpetra::Distributor::"
545  "doPosts(4 args, Kokkos): Process " << myProcID << ": requests_.size () = "
546  << requests_.size () << " != 0.");
547 #endif // HAVE_TEUCHOS_DEBUG
548  // Distributor uses requests_.size() as the number of outstanding
549  // nonblocking message requests, so we resize to zero to maintain
550  // this invariant.
551  //
552  // getNumReceives() does _not_ include the self message, if there is
553  // one. Here, we do actually send a message to ourselves, so we
554  // include any self message in the "actual" number of receives to
555  // post.
556  //
557  // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
558  // doesn't (re)allocate its array of requests. That happens in
559  // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
560  // demand), or Resize_().
561  const size_type actualNumReceives = as<size_type> (plan.getNumReceives()) +
562  as<size_type> (plan.hasSelfMessage() ? 1 : 0);
563  requests_.resize (0);
564 
565  // Post the nonblocking receives. It's common MPI wisdom to post
566  // receives before sends. In MPI terms, this means favoring
567  // adding to the "posted queue" (of receive requests) over adding
568  // to the "unexpected queue" (of arrived messages not yet matched
569  // with a receive).
570  {
571 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
572  Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts4KV_recvs_);
573 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
574 
575  size_t curBufferOffset = 0;
576  size_t curLIDoffset = 0;
577  for (size_type i = 0; i < actualNumReceives; ++i) {
578  size_t totalPacketsFrom_i = 0;
579  for (size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
580  totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset+j];
581  }
582  curLIDoffset += plan.getLengthsFrom()[i];
583  if (plan.getProcsFrom()[i] != myProcID && totalPacketsFrom_i) {
584  // If my process is receiving these packet(s) from another
585  // process (not a self-receive), and if there is at least
586  // one packet to receive:
587  //
588  // 1. Set up the persisting view (recvBuf) into the imports
589  // array, given the offset and size (total number of
590  // packets from process getProcsFrom()[i]).
591  // 2. Start the Irecv and save the resulting request.
592  imports_view_type recvBuf =
593  subview_offset (imports, curBufferOffset, totalPacketsFrom_i);
594  requests_.push_back (ireceive<int> (recvBuf, plan.getProcsFrom()[i],
595  tag, *plan.getComm()));
596  }
597  else { // Receiving these packet(s) from myself
598  selfReceiveOffset = curBufferOffset; // Remember the offset
599  }
600  curBufferOffset += totalPacketsFrom_i;
601  }
602  }
603 
604  if (doBarrier) {
605 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
606  Teuchos::TimeMonitor timeMonBarrier (*timer_doPosts4KV_barrier_);
607 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
608  // If we are using ready sends (MPI_Rsend) below, we need to do
609  // a barrier before we post the ready sends. This is because a
610  // ready send requires that its matching receive has already
611  // been posted before the send has been posted. The only way to
612  // guarantee that in this case is to use a barrier.
613  plan.getComm()->barrier ();
614  }
615 
616 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
617  Teuchos::TimeMonitor timeMonSends (*timer_doPosts4KV_sends_);
618 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
619 
620  // setup arrays containing starting-offsets into exports for each send,
621  // and num-packets-to-send for each send.
622  Array<size_t> sendPacketOffsets(plan.getNumSends(),0), packetsPerSend(plan.getNumSends(),0);
623  size_t maxNumPackets = 0;
624  size_t curPKToffset = 0;
625  for (size_t pp=0; pp<plan.getNumSends(); ++pp) {
626  sendPacketOffsets[pp] = curPKToffset;
627  size_t numPackets = 0;
628  for (size_t j=plan.getStartsTo()[pp]; j<plan.getStartsTo()[pp]+plan.getLengthsTo()[pp]; ++j) {
629  numPackets += numExportPacketsPerLID[j];
630  }
631  if (numPackets > maxNumPackets) maxNumPackets = numPackets;
632  packetsPerSend[pp] = numPackets;
633  curPKToffset += numPackets;
634  }
635 
636  // setup scan through getProcsTo() list starting with higher numbered procs
637  // (should help balance message traffic)
638  size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
639  size_t procIndex = 0;
640  while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myProcID)) {
641  ++procIndex;
642  }
643  if (procIndex == numBlocks) {
644  procIndex = 0;
645  }
646 
647  size_t selfNum = 0;
648  size_t selfIndex = 0;
649  if (plan.getIndicesTo().is_null()) {
650 
651 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
652  Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_fast_);
653 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
654 
655  // Data are already blocked (laid out) by process, so we don't
656  // need a separate send buffer (besides the exports array).
657  for (size_t i = 0; i < numBlocks; ++i) {
658  size_t p = i + procIndex;
659  if (p > (numBlocks - 1)) {
660  p -= numBlocks;
661  }
662 
663  if (plan.getProcsTo()[p] != myProcID && packetsPerSend[p] > 0) {
664  exports_view_type tmpSend =
665  subview_offset(exports, sendPacketOffsets[p], packetsPerSend[p]);
666 
667  if (sendType == Details::DISTRIBUTOR_SEND) { // the default, so put it first
668  send<int> (tmpSend,
669  as<int> (tmpSend.size ()),
670  plan.getProcsTo()[p], tag, *plan.getComm());
671  }
672  else if (sendType == Details::DISTRIBUTOR_RSEND) {
673  readySend<int> (tmpSend,
674  as<int> (tmpSend.size ()),
675  plan.getProcsTo()[p], tag, *plan.getComm());
676  }
677  else if (sendType == Details::DISTRIBUTOR_ISEND) {
678  exports_view_type tmpSendBuf =
679  subview_offset (exports, sendPacketOffsets[p], packetsPerSend[p]);
680  requests_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
681  tag, *plan.getComm()));
682  }
683  else if (sendType == Details::DISTRIBUTOR_SSEND) {
684  ssend<int> (tmpSend,
685  as<int> (tmpSend.size ()),
686  plan.getProcsTo()[p], tag, *plan.getComm());
687  }
688  else {
689  TEUCHOS_TEST_FOR_EXCEPTION(
690  true, std::logic_error,
691  "Tpetra::Distributor::doPosts(4 args, Kokkos): "
692  "Invalid send type. We should never get here. "
693  "Please report this bug to the Tpetra developers.");
694  }
695  }
696  else { // "Sending" the message to myself
697  selfNum = p;
698  }
699  }
700 
701  if (plan.hasSelfMessage()) {
702  deep_copy_offset(imports, exports, selfReceiveOffset,
703  sendPacketOffsets[selfNum], packetsPerSend[selfNum]);
704  }
705  }
706  else { // data are not blocked by proc, use send buffer
707 
708 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
709  Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_slow_);
710 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
711 
712  // FIXME (mfh 05 Mar 2013) This may be broken for Isend.
713  typedef typename ExpView::non_const_value_type Packet;
714  typedef typename ExpView::array_layout Layout;
715  typedef typename ExpView::device_type Device;
716  typedef typename ExpView::memory_traits Mem;
717  Kokkos::View<Packet*,Layout,Device,Mem> sendArray ("sendArray", maxNumPackets); // send buffer
718 
719  TEUCHOS_TEST_FOR_EXCEPTION(
720  sendType == Details::DISTRIBUTOR_ISEND,
721  std::logic_error,
722  "Tpetra::Distributor::doPosts(4-arg, Kokkos): "
723  "The \"send buffer\" code path may not necessarily work with nonblocking sends.");
724 
725  Array<size_t> indicesOffsets (numExportPacketsPerLID.size(), 0);
726  size_t ioffset = 0;
727  for (int j=0; j<numExportPacketsPerLID.size(); ++j) {
728  indicesOffsets[j] = ioffset;
729  ioffset += numExportPacketsPerLID[j];
730  }
731 
732  for (size_t i = 0; i < numBlocks; ++i) {
733  size_t p = i + procIndex;
734  if (p > (numBlocks - 1)) {
735  p -= numBlocks;
736  }
737 
738  if (plan.getProcsTo()[p] != myProcID) {
739  size_t sendArrayOffset = 0;
740  size_t j = plan.getStartsTo()[p];
741  size_t numPacketsTo_p = 0;
742  for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
743  numPacketsTo_p += numExportPacketsPerLID[j];
744  deep_copy_offset(sendArray, exports, sendArrayOffset,
745  indicesOffsets[j], numExportPacketsPerLID[j]);
746  sendArrayOffset += numExportPacketsPerLID[j];
747  }
748  if (numPacketsTo_p > 0) {
749  ImpView tmpSend =
750  subview_offset(sendArray, size_t(0), numPacketsTo_p);
751 
752  if (sendType == Details::DISTRIBUTOR_RSEND) {
753  readySend<int> (tmpSend,
754  as<int> (tmpSend.size ()),
755  plan.getProcsTo()[p], tag, *plan.getComm());
756  }
757  else if (sendType == Details::DISTRIBUTOR_ISEND) {
758  exports_view_type tmpSendBuf =
759  subview_offset (sendArray, size_t(0), numPacketsTo_p);
760  requests_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
761  tag, *plan.getComm()));
762  }
763  else if (sendType == Details::DISTRIBUTOR_SSEND) {
764  ssend<int> (tmpSend,
765  as<int> (tmpSend.size ()),
766  plan.getProcsTo()[p], tag, *plan.getComm());
767  }
768  else { // if (sendType == Details::DISTRIBUTOR_SSEND)
769  send<int> (tmpSend,
770  as<int> (tmpSend.size ()),
771  plan.getProcsTo()[p], tag, *plan.getComm());
772  }
773  }
774  }
775  else { // "Sending" the message to myself
776  selfNum = p;
777  selfIndex = plan.getStartsTo()[p];
778  }
779  }
780 
781  if (plan.hasSelfMessage()) {
782  for (size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
783  deep_copy_offset(imports, exports, selfReceiveOffset,
784  indicesOffsets[selfIndex],
785  numExportPacketsPerLID[selfIndex]);
786  selfReceiveOffset += numExportPacketsPerLID[selfIndex];
787  ++selfIndex;
788  }
789  }
790  }
791 }
792 
793 }
794 }
795 
796 #endif
Stand-alone utility functions and macros.
Implementation details of Tpetra.
EDistributorSendType
The type of MPI send that Distributor should use.
Namespace Tpetra contains the class and methods constituting the Tpetra library.