PaCO++  0.05
Schedule.cc File Reference
#include <stdio.h>
#include <mpi.h>
#include <algorithm>
#include "Schedule.h"
#include "Internal.h"
#include "DistributionBloc.h"
#include <vector>
#include <iostream>
Include dependency graph for Schedule.cc:

Go to the source code of this file.

Functions

bool cmp_rank (const LocalData_t &a, const LocalData_t &b)
 
void computeReceiveBlock1D (const GlobalData_t &gd, const LocalData_t &dd, const Topology_t &stopo, const Topology_t &dtopo, const ParisBlock_param_t *param, vector< LocalData_t > &vOut)
 
void computeSendBlock1D (const GlobalData_t &gd, const LocalData_t &sd, const Topology_t &stopo, const Topology_t &dtopo, const ParisBlock_param_t *param, vector< LocalData_t > &vOut)
 
void doSchedule (const GlobalData_t &gd, const LocalData_t &ld, const Topology_t &ctopo, vector< LocalData_t > &sched_send, vector< LocalData_t > &sched_recv, void *comm)
 

Function Documentation

◆ cmp_rank()

bool cmp_rank ( const LocalData_t &  a,
const LocalData_t &  b 
)

Definition at line 27 of file Schedule.cc.

Referenced by doSchedule().

28 {
29  return a.start < b.start;
30 }

◆ computeReceiveBlock1D()

void computeReceiveBlock1D ( const GlobalData_t &  gd,
const LocalData_t &  dd,
const Topology_t &  stopo,
const Topology_t &  dtopo,
const ParisBlock_param_t param,
vector< LocalData_t > &  vOut 
)

Definition at line 135 of file Schedule.cc.

References BlockNumberOfElementProc(), blockSize(), getProcRangeInf(), getProcRangeSup(), NumberOfBlockProc(), and OwnerBlock().

Referenced by computeReceiveDataBlock1DServer(), and computeSendDataBlock1DClient().

137  {
138 
139 #ifdef DEBUG_INTERNAL
140  cerr << "\nIn compute Receive Schedule--------------------\n";
141 
142  fprintf(stderr, "stopo: %ld\tdtopo: %ld\n",stopo.total, dtopo.total);
143  fprintf(stderr, "gd.len %ld\tdd.start %d\tdd.len %d\n", gd.len, dd.start, dd.len);
144 
145 #endif
146 
147  if (stopo.total == dtopo.total) {
148  vOut.push_back(dd);
149 #ifdef DEBUG_INTERNAL
150  fprintf(stderr, " rank:%d start:%d len:%d base:%p\n", dd.rank, dd.start, dd.len, dd.base);
151 #endif
152  } else {
153  // Apend mode
154  vOut.reserve(vOut.size()+stopo.total); // in bloc mode, at most one msg from each src node
155 
156  unsigned slbsz = blockSize(gd.len, stopo.total, param);
157 
158  if (gd.cyclic == 0) {
159 
160  unsigned long dlow = dd.start;
161  unsigned long dhigh = dlow + dd.len;
162 
163  unsigned fpid, lpid;
164  fpid = getProcRangeInf(dlow, slbsz);
165  lpid = getProcRangeSup(dhigh, slbsz);
166 
167 #ifdef DEBUG_INTERNAL
168  fprintf(stderr, " loop from %d to %d width stotal: %ld\n", fpid, lpid, stopo.total);
169 #endif
170 
171  // for each src bloc
172  for(unsigned i=fpid; i <= lpid; i++) {
173 
174  vOut.resize(vOut.size()+1);
175  LocalData_t& s = vOut[vOut.size()-1];
176 
177  s.rank = i;
178  unsigned tmp = i*slbsz;
179  s.start = ( dlow >= tmp)?dlow:tmp; // max
180 
181  tmp = (i+1)*slbsz;
182  unsigned end = ( dhigh <= tmp)?dhigh:tmp; // min
183 
184  s.len = end - s.start;
185 
186  s.base = dd.base + ((s.start - dd.start) * gd.unit_size);
187 
188 #ifdef DEBUG_INTERNAL
189  fprintf(stderr, " r1: from:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base);
190 #endif
191  }
192  } else {
193  // it is a blockcyclic distribution
194 
195  unsigned dlbsz = blockSize(gd.len, dtopo.total, param);
196  unsigned dtbsz = dlbsz * dtopo.total;
197  unsigned nbbloc = NumberOfBlockProc(gd.len, dtopo.total, dlbsz, dd.rank);
198 
199  // for each dst bloc, find a src node
200  for(unsigned b=0; b<nbbloc; b++) {
201  unsigned gb = b * dtopo.total + dd.rank; // global bloc id
202  unsigned srank = OwnerBlock(gb, stopo.total);
203 
204  vOut.resize(vOut.size()+1);
205  LocalData_t& s = vOut[vOut.size()-1];
206 
207  s.rank = srank;
208  s.start = dtbsz*b + dd.rank*dlbsz;
209  s.len = BlockNumberOfElementProc(gd.len, dd.rank, dtopo.total, dlbsz, b);
210  s.base = dd.base + b * dlbsz * gd.unit_size;
211 
212 #ifdef DEBUG_INTERNAL
213  fprintf(stderr, " r2: from:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base);
214 #endif
215  }
216  }
217  }
218 }
static unsigned BlockNumberOfElementProc(const unsigned glen, const unsigned rank, const unsigned nbprocs, const unsigned bsz, const unsigned pos)
static unsigned NumberOfBlockProc(const unsigned glen, const unsigned nbprocs, const unsigned bsz, const unsigned rank)
static unsigned OwnerBlock(const unsigned bid, const unsigned nbprocs)
static unsigned getProcRangeInf(unsigned low, unsigned bsz)
Definition: BasicBC.cc:23
static unsigned blockSize(const unsigned glen, const unsigned nbprocs, const ParisBlock_param_t *param)
vector< BasicBC > dd
Definition: debug.cc:15
static unsigned getProcRangeSup(unsigned high, unsigned bsz)
Definition: BasicBC.cc:28
Here is the call graph for this function:

◆ computeSendBlock1D()

void computeSendBlock1D ( const GlobalData_t &  gd,
const LocalData_t &  sd,
const Topology_t &  stopo,
const Topology_t &  dtopo,
const ParisBlock_param_t param,
vector< LocalData_t > &  vOut 
)

Definition at line 38 of file Schedule.cc.

References BlockNumberOfElementProc(), blockSize(), computeBlockBounds(), getProcRangeInf(), getProcRangeSup(), NumberOfBlockProc(), and OwnerBlock().

Referenced by computeReceiveDataBlock1DServer(), and computeSendDataBlock1DClient().

40  {
41 
42 #ifdef DEBUG_INTERNAL
43  cerr << "\nIn compute Send Schedule--------------------\n";
44 
45  fprintf(stderr, "stopo: %ld\tdtopo: %ld\n",stopo.total, dtopo.total);
46  fprintf(stderr, "gd.len %ld\tgd.cyclic: %ld\tsd.start %d\tsd.len %d\n", gd.len, gd.cyclic, sd.start, sd.len);
47 #endif
48 
49  if (stopo.total == dtopo.total) {
50  // vOut.reserve(vOut.size()+dtopo.total); // in bloc mode, at most one msg to each dest node
51  vOut.push_back(sd);
52 #ifdef DEBUG_INTERNAL
53  fprintf(stderr, " rank:%d start:%d len:%d base:%p\n", sd.rank, sd.start, sd.len, sd.base);
54 #endif
55  } else {
56  // Append mode
57  vOut.reserve(vOut.size() + dtopo.total); // in bloc mode, at most one msg to each dest node
58 
59  unsigned slbsz = blockSize(gd.len, stopo.total, param);
60 
61  if (gd.cyclic == 0) {
62  // that's a standard bloc redistribution
63 
64  unsigned long slow, shigh;
65  computeBlockBounds(&slow, &shigh, gd.len, sd.rank, stopo.total, slbsz, 0);
66 
67  unsigned dlbsz = blockSize(gd.len, dtopo.total, param);
68 
69  unsigned fpid, lpid;
70  fpid = getProcRangeInf(slow, dlbsz);
71  lpid = getProcRangeSup(shigh, dlbsz);
72 
73 #ifdef DEBUG_INTERNAL
74  fprintf(stderr, " loop from %d to %d width dtotal: %ld\n", fpid, lpid, dtopo.total);
75 #endif
76 
77  // for each dest bloc
78  for(unsigned i=fpid; i <= lpid; i++) {
79 
80  vOut.resize(vOut.size()+1);
81  LocalData_t& s = vOut[vOut.size()-1];
82 
83  s.rank = i;
84  unsigned tmp = i*dlbsz;
85  s.start = ( slow >= tmp)?slow:tmp; // max
86 
87  tmp = (i+1)*dlbsz;
88  unsigned end = ( shigh <= tmp)?shigh:tmp; // min
89 
90  s.len = end - s.start;
91 
92  s.base = sd.base + ((s.start - sd.start) * gd.unit_size);
93 
94 #ifdef DEBUG_INTERNAL
95  fprintf(stderr, " s1: to:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base);
96 #endif
97  }
98 
99  } else {
100  // it is a blockcyclic distribution
101 
102  unsigned stbsz = slbsz * stopo.total;
103  unsigned nbbloc = NumberOfBlockProc(gd.len, stopo.total, slbsz, sd.rank);
104 
105  // for each src bloc, find a dst node
106  for(unsigned b=0; b<nbbloc; b++) {
107  unsigned gb = b * stopo.total + sd.rank; // global bloc id
108  unsigned drank = OwnerBlock(gb, dtopo.total);
109 
110  vOut.resize(vOut.size()+1);
111  LocalData_t& s = vOut[vOut.size()-1];
112 
113  s.rank = drank;
114  s.start = (stbsz*b) + (sd.rank*slbsz);
115  s.len = BlockNumberOfElementProc(gd.len, sd.rank, stopo.total, slbsz, b);
116  s.base = sd.base + ( b * slbsz * gd.unit_size );
117 
118 #ifdef DEBUG_INTERNAL
119  fprintf(stderr, " s2: to:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base);
120 #endif
121  }
122  }
123  }
124 
125 #ifdef DEBUG_INTERNAL
126  cerr << "\nIn compute Send Schedule-------------------- done\n";
127 #endif
128 }
static unsigned BlockNumberOfElementProc(const unsigned glen, const unsigned rank, const unsigned nbprocs, const unsigned bsz, const unsigned pos)
static unsigned NumberOfBlockProc(const unsigned glen, const unsigned nbprocs, const unsigned bsz, const unsigned rank)
vector< BasicBC > sd
Definition: debug.cc:14
static unsigned OwnerBlock(const unsigned bid, const unsigned nbprocs)
static void computeBlockBounds(unsigned long *low, unsigned long *high, const unsigned glen, const unsigned rank, const unsigned nbprocs, const unsigned bsz, const unsigned pos)
static unsigned getProcRangeInf(unsigned low, unsigned bsz)
Definition: BasicBC.cc:23
static unsigned blockSize(const unsigned glen, const unsigned nbprocs, const ParisBlock_param_t *param)
static unsigned getProcRangeSup(unsigned high, unsigned bsz)
Definition: BasicBC.cc:28
Here is the call graph for this function:

◆ doSchedule()

void doSchedule ( const GlobalData_t &  gd,
const LocalData_t &  ld,
const Topology_t &  ctopo,
vector< LocalData_t > &  sched_send,
vector< LocalData_t > &  sched_recv,
void *  comm 
)

Definition at line 227 of file Schedule.cc.

Referenced by computeReceiveDataBlock1DServer(), and computeSendDataBlock1DClient().

228  {
229 
230 #ifdef DEBUG_INTERNAL
231  cerr << "\nIn doSchedule--------------------\n";
232 #endif
233 
234  MPI_Comm mpi_comm = *(MPI_Comm*) comm;
235 
236 #ifdef DEBUG_COMM
237  fprintf(stderr," MPI_COMM_WORLD=%d mpi_comm=%d\n", MPI_COMM_WORLD, mpi_comm);
238 #endif
239 
240  if (sched_send.size() || sched_recv.size()) {
241 
242  MPI_Request sreq[sched_send.size()];
243  MPI_Request rreq[sched_recv.size()];
244  unsigned si, ri;
245  si=0;
246  ri=0;
247 
248  MPI_Status sstat[sched_send.size()];
249  MPI_Status rstat[sched_recv.size()];
250 
251  vector<LocalData_t*> local_recv;
252  vector<LocalData_t*> local_send;
253 
254  local_recv.clear();
255  local_send.clear();
256 
258  // Sorting data
259 
260  if (sched_send.size()) std::stable_sort(sched_send.begin(), sched_send.end(), cmp_rank);
261  if (sched_recv.size()) std::stable_sort(sched_recv.begin(), sched_recv.end(), cmp_rank);
262 
264  // Sending data
265 
266  // Post Asynchronous MPI receive
267 #ifdef DEBUG_COM
268  cerr << " #sched_recv: " << sched_recv.size() << endl;
269 #endif
270  for(unsigned i=0; i < sched_recv.size(); i++) {
271  unsigned from = getProcId(sched_recv[i].rank, ctopo);
272  if (from == ld.rank) {
273 #ifdef DEBUG_COMM
274  fprintf(stderr, " recv: schedr no=%d start=%d len=%d from=%d LOCAL\n", i,
275  sched_recv[i].start, sched_recv[i].len, from);
276 #endif
277  local_recv.push_back(&sched_recv[i]);
278  } else {
279 #ifdef DEBUG_COMM
280  fprintf(stderr, " recv: schedr no=%d start=%d len=%d from=%d base=%p\n", i,
281  sched_recv[i].start, sched_recv[i].len, from, sched_recv[i].base);
282 #endif
283 
284  int err = MPI_Irecv(sched_recv[i].base, sched_recv[i].len*gd.unit_size,
285  MPI_BYTE, from, 51, mpi_comm, &rreq[ri++]);
286  if (err!= MPI_SUCCESS) {
287  cerr << "EROR IN MPI_Irecv: return value is "<<err<<endl;
288  }
289  }
290  }
291 
292  // Send data via MPI
293 #ifdef DEBUG_COMM
294  cerr << " #sched_send: " << sched_send.size() << endl;
295 #endif
296  for(unsigned i=0; i < sched_send.size(); i++) {
297  unsigned to = getProcId(sched_send[i].rank, ctopo);
298  if (to == ld.rank) {
299 #ifdef DEBUG_COMM
300  fprintf(stderr, " send: scheds no=%d start=%d len=%d to=%d LOCAL\n", i,
301  sched_send[i].start, sched_send[i].len, to);
302 #endif
303  local_send.push_back(&sched_send[i]);
304  } else {
305 #ifdef DEBUG_COMM
306  fprintf(stderr, " send: scheds no=%d start=%d len=%d to=%d base=%p\n", i,
307  sched_send[i].start, sched_send[i].len, to, sched_send[i].base);
308 #endif
309 
310  int err = MPI_Isend(sched_send[i].base, sched_send[i].len*gd.unit_size,
311  MPI_BYTE, to, 51, mpi_comm, &sreq[si++]);
312  if (err!= MPI_SUCCESS) {
313  cerr << "EROR IN MPI_Isend: return value is "<<err<<endl;
314  }
315  }
316  }
317 
318  // Do local communication vie memcpy
319  if (local_recv.size() != local_send.size()) {
320  cerr << "Error: local recv & send have different size: " << local_recv.size() << " " << local_send.size() << endl;
321  }
322  for(unsigned i=0; i < local_recv.size(); i++) {
323  if (local_recv[i]->len != local_send[i]->len) {
324  cerr << "Error: local recv & send have different len for i= "<<i<< " :" << local_recv[i]->len << " " << local_send[i]->len << endl;
325  }
326 #ifdef DEBUG_COMM
327  fprintf(stderr, " local: scheds no=%d start=%d len=%d\n", i,
328  sched_send[i].start, sched_send[i].len);
329 #endif
330  memcpy(local_recv[i]->base, local_send[i]->base, local_send[i]->len*gd.unit_size);
331  }
332 
333 
334  // Wait all receive & send
335 #ifdef DEBUG_INTERNAL
336  cerr << "WAITING local communications to end...\n";
337 #endif
338 
339  int err;
340  err = MPI_Waitall(si, sreq, sstat);
341  if (err!= MPI_SUCCESS) {
342  cerr << "EROR IN MPI_WaitAll for send: return value is "<<err<<endl;
343  }
344  err = MPI_Waitall(ri, rreq, rstat);
345  if (err!= MPI_SUCCESS) {
346  cerr << "EROR IN MPI_WaitAll for recv: return value is "<<err<<endl;
347  }
348 #ifdef DEBUG_INTERNAL
349  cerr << "WAITING local communications to end...ok \n";
350 #endif
351  }
352 }
static unsigned getProcId(unsigned rank, Topology_t topo)
Definition: Internal.h:11
bool cmp_rank(const LocalData_t &a, const LocalData_t &b)
Definition: Schedule.cc:27