PaCO++  0.05
Schedule.cc
Go to the documentation of this file.
1 /* Padico Advanced Examples
2  * author: Christian Pérez
3  */
4 
5 #include <stdio.h>
6 #include <mpi.h>
7 
8 #include <algorithm>
9 
10 #include "Schedule.h"
11 #include "Internal.h"
12 #include "DistributionBloc.h"
13 
14 #include <vector>
15 #include <iostream>
16 
17 using namespace std;
18 
19 #undef DEBUG_INTERNAL
20 #undef DEBUG_COMM
21 
22 /************************************************************/
23 /************************************************************/
24 /************************************************************/
25 
26 // Ascending rank sorting function for schedule
27 bool cmp_rank(const LocalData_t& a, const LocalData_t& b)
28 {
29  return a.start < b.start;
30 }
31 
32 
33 /************************************************************/
34 /************************************************************/
35 /************************************************************/
36 
37 // vOut represents what localData/stopo have to send to nodes of dtopo (vOut[].rank is in dtopo space)
38 void computeSendBlock1D(const GlobalData_t& gd, const LocalData_t &sd,
39  const Topology_t &stopo, const Topology_t &dtopo,
40  const ParisBlock_param_t* param, vector<LocalData_t>& vOut) {
41 
42 #ifdef DEBUG_INTERNAL
43  cerr << "\nIn compute Send Schedule--------------------\n";
44 
45  fprintf(stderr, "stopo: %ld\tdtopo: %ld\n",stopo.total, dtopo.total);
46  fprintf(stderr, "gd.len %ld\tgd.cyclic: %ld\tsd.start %d\tsd.len %d\n", gd.len, gd.cyclic, sd.start, sd.len);
47 #endif
48 
49  if (stopo.total == dtopo.total) {
50  // vOut.reserve(vOut.size()+dtopo.total); // in bloc mode, at most one msg to each dest node
51  vOut.push_back(sd);
52 #ifdef DEBUG_INTERNAL
53  fprintf(stderr, " rank:%d start:%d len:%d base:%p\n", sd.rank, sd.start, sd.len, sd.base);
54 #endif
55  } else {
56  // Append mode
57  vOut.reserve(vOut.size() + dtopo.total); // in bloc mode, at most one msg to each dest node
58 
59  unsigned slbsz = blockSize(gd.len, stopo.total, param);
60 
61  if (gd.cyclic == 0) {
62  // that's a standard bloc redistribution
63 
64  unsigned long slow, shigh;
65  computeBlockBounds(&slow, &shigh, gd.len, sd.rank, stopo.total, slbsz, 0);
66 
67  unsigned dlbsz = blockSize(gd.len, dtopo.total, param);
68 
69  unsigned fpid, lpid;
70  fpid = getProcRangeInf(slow, dlbsz);
71  lpid = getProcRangeSup(shigh, dlbsz);
72 
73 #ifdef DEBUG_INTERNAL
74  fprintf(stderr, " loop from %d to %d width dtotal: %ld\n", fpid, lpid, dtopo.total);
75 #endif
76 
77  // for each dest bloc
78  for(unsigned i=fpid; i <= lpid; i++) {
79 
80  vOut.resize(vOut.size()+1);
81  LocalData_t& s = vOut[vOut.size()-1];
82 
83  s.rank = i;
84  unsigned tmp = i*dlbsz;
85  s.start = ( slow >= tmp)?slow:tmp; // max
86 
87  tmp = (i+1)*dlbsz;
88  unsigned end = ( shigh <= tmp)?shigh:tmp; // min
89 
90  s.len = end - s.start;
91 
92  s.base = sd.base + ((s.start - sd.start) * gd.unit_size);
93 
94 #ifdef DEBUG_INTERNAL
95  fprintf(stderr, " s1: to:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base);
96 #endif
97  }
98 
99  } else {
100  // it is a blockcyclic distribution
101 
102  unsigned stbsz = slbsz * stopo.total;
103  unsigned nbbloc = NumberOfBlockProc(gd.len, stopo.total, slbsz, sd.rank);
104 
105  // for each src bloc, find a dst node
106  for(unsigned b=0; b<nbbloc; b++) {
107  unsigned gb = b * stopo.total + sd.rank; // global bloc id
108  unsigned drank = OwnerBlock(gb, dtopo.total);
109 
110  vOut.resize(vOut.size()+1);
111  LocalData_t& s = vOut[vOut.size()-1];
112 
113  s.rank = drank;
114  s.start = (stbsz*b) + (sd.rank*slbsz);
115  s.len = BlockNumberOfElementProc(gd.len, sd.rank, stopo.total, slbsz, b);
116  s.base = sd.base + ( b * slbsz * gd.unit_size );
117 
118 #ifdef DEBUG_INTERNAL
119  fprintf(stderr, " s2: to:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base);
120 #endif
121  }
122  }
123  }
124 
125 #ifdef DEBUG_INTERNAL
126  cerr << "\nIn compute Send Schedule-------------------- done\n";
127 #endif
128 }
129 
133 
134 // vOut represents what localData/dtopo have to receive from nodes of stopo (vOut[].rank is in stopo space)
135 void computeReceiveBlock1D(const GlobalData_t& gd, const LocalData_t &dd,
136  const Topology_t &stopo, const Topology_t &dtopo,
137  const ParisBlock_param_t* param, vector<LocalData_t>& vOut) {
138 
139 #ifdef DEBUG_INTERNAL
140  cerr << "\nIn compute Receive Schedule--------------------\n";
141 
142  fprintf(stderr, "stopo: %ld\tdtopo: %ld\n",stopo.total, dtopo.total);
143  fprintf(stderr, "gd.len %ld\tdd.start %d\tdd.len %d\n", gd.len, dd.start, dd.len);
144 
145 #endif
146 
147  if (stopo.total == dtopo.total) {
148  vOut.push_back(dd);
149 #ifdef DEBUG_INTERNAL
150  fprintf(stderr, " rank:%d start:%d len:%d base:%p\n", dd.rank, dd.start, dd.len, dd.base);
151 #endif
152  } else {
153  // Apend mode
154  vOut.reserve(vOut.size()+stopo.total); // in bloc mode, at most one msg from each src node
155 
156  unsigned slbsz = blockSize(gd.len, stopo.total, param);
157 
158  if (gd.cyclic == 0) {
159 
160  unsigned long dlow = dd.start;
161  unsigned long dhigh = dlow + dd.len;
162 
163  unsigned fpid, lpid;
164  fpid = getProcRangeInf(dlow, slbsz);
165  lpid = getProcRangeSup(dhigh, slbsz);
166 
167 #ifdef DEBUG_INTERNAL
168  fprintf(stderr, " loop from %d to %d width stotal: %ld\n", fpid, lpid, stopo.total);
169 #endif
170 
171  // for each src bloc
172  for(unsigned i=fpid; i <= lpid; i++) {
173 
174  vOut.resize(vOut.size()+1);
175  LocalData_t& s = vOut[vOut.size()-1];
176 
177  s.rank = i;
178  unsigned tmp = i*slbsz;
179  s.start = ( dlow >= tmp)?dlow:tmp; // max
180 
181  tmp = (i+1)*slbsz;
182  unsigned end = ( dhigh <= tmp)?dhigh:tmp; // min
183 
184  s.len = end - s.start;
185 
186  s.base = dd.base + ((s.start - dd.start) * gd.unit_size);
187 
188 #ifdef DEBUG_INTERNAL
189  fprintf(stderr, " r1: from:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base);
190 #endif
191  }
192  } else {
193  // it is a blockcyclic distribution
194 
195  unsigned dlbsz = blockSize(gd.len, dtopo.total, param);
196  unsigned dtbsz = dlbsz * dtopo.total;
197  unsigned nbbloc = NumberOfBlockProc(gd.len, dtopo.total, dlbsz, dd.rank);
198 
199  // for each dst bloc, find a src node
200  for(unsigned b=0; b<nbbloc; b++) {
201  unsigned gb = b * dtopo.total + dd.rank; // global bloc id
202  unsigned srank = OwnerBlock(gb, stopo.total);
203 
204  vOut.resize(vOut.size()+1);
205  LocalData_t& s = vOut[vOut.size()-1];
206 
207  s.rank = srank;
208  s.start = dtbsz*b + dd.rank*dlbsz;
209  s.len = BlockNumberOfElementProc(gd.len, dd.rank, dtopo.total, dlbsz, b);
210  s.base = dd.base + b * dlbsz * gd.unit_size;
211 
212 #ifdef DEBUG_INTERNAL
213  fprintf(stderr, " r2: from:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base);
214 #endif
215  }
216  }
217  }
218 }
219 
220 /************************************************************/
221 /************************************************************/
222 /************************************************************/
223 
224 // ctopo is the current topo
225 // sdata: a pointer to the source data (pointer to local data)
226 // ddata: a pointer to the base address of of reveive (pointer to local data)
227 void doSchedule(const GlobalData_t& gd, const LocalData_t& ld, const Topology_t &ctopo,
228  vector<LocalData_t>& sched_send, vector<LocalData_t>& sched_recv, void* comm) {
229 
230 #ifdef DEBUG_INTERNAL
231  cerr << "\nIn doSchedule--------------------\n";
232 #endif
233 
234  MPI_Comm mpi_comm = *(MPI_Comm*) comm;
235 
236 #ifdef DEBUG_COMM
237  fprintf(stderr," MPI_COMM_WORLD=%d mpi_comm=%d\n", MPI_COMM_WORLD, mpi_comm);
238 #endif
239 
240  if (sched_send.size() || sched_recv.size()) {
241 
242  MPI_Request sreq[sched_send.size()];
243  MPI_Request rreq[sched_recv.size()];
244  unsigned si, ri;
245  si=0;
246  ri=0;
247 
248  MPI_Status sstat[sched_send.size()];
249  MPI_Status rstat[sched_recv.size()];
250 
251  vector<LocalData_t*> local_recv;
252  vector<LocalData_t*> local_send;
253 
254  local_recv.clear();
255  local_send.clear();
256 
258  // Sorting data
259 
260  if (sched_send.size()) std::stable_sort(sched_send.begin(), sched_send.end(), cmp_rank);
261  if (sched_recv.size()) std::stable_sort(sched_recv.begin(), sched_recv.end(), cmp_rank);
262 
264  // Sending data
265 
266  // Post Asynchronous MPI receive
267 #ifdef DEBUG_COM
268  cerr << " #sched_recv: " << sched_recv.size() << endl;
269 #endif
270  for(unsigned i=0; i < sched_recv.size(); i++) {
271  unsigned from = getProcId(sched_recv[i].rank, ctopo);
272  if (from == ld.rank) {
273 #ifdef DEBUG_COMM
274  fprintf(stderr, " recv: schedr no=%d start=%d len=%d from=%d LOCAL\n", i,
275  sched_recv[i].start, sched_recv[i].len, from);
276 #endif
277  local_recv.push_back(&sched_recv[i]);
278  } else {
279 #ifdef DEBUG_COMM
280  fprintf(stderr, " recv: schedr no=%d start=%d len=%d from=%d base=%p\n", i,
281  sched_recv[i].start, sched_recv[i].len, from, sched_recv[i].base);
282 #endif
283 
284  int err = MPI_Irecv(sched_recv[i].base, sched_recv[i].len*gd.unit_size,
285  MPI_BYTE, from, 51, mpi_comm, &rreq[ri++]);
286  if (err!= MPI_SUCCESS) {
287  cerr << "EROR IN MPI_Irecv: return value is "<<err<<endl;
288  }
289  }
290  }
291 
292  // Send data via MPI
293 #ifdef DEBUG_COMM
294  cerr << " #sched_send: " << sched_send.size() << endl;
295 #endif
296  for(unsigned i=0; i < sched_send.size(); i++) {
297  unsigned to = getProcId(sched_send[i].rank, ctopo);
298  if (to == ld.rank) {
299 #ifdef DEBUG_COMM
300  fprintf(stderr, " send: scheds no=%d start=%d len=%d to=%d LOCAL\n", i,
301  sched_send[i].start, sched_send[i].len, to);
302 #endif
303  local_send.push_back(&sched_send[i]);
304  } else {
305 #ifdef DEBUG_COMM
306  fprintf(stderr, " send: scheds no=%d start=%d len=%d to=%d base=%p\n", i,
307  sched_send[i].start, sched_send[i].len, to, sched_send[i].base);
308 #endif
309 
310  int err = MPI_Isend(sched_send[i].base, sched_send[i].len*gd.unit_size,
311  MPI_BYTE, to, 51, mpi_comm, &sreq[si++]);
312  if (err!= MPI_SUCCESS) {
313  cerr << "EROR IN MPI_Isend: return value is "<<err<<endl;
314  }
315  }
316  }
317 
318  // Do local communication vie memcpy
319  if (local_recv.size() != local_send.size()) {
320  cerr << "Error: local recv & send have different size: " << local_recv.size() << " " << local_send.size() << endl;
321  }
322  for(unsigned i=0; i < local_recv.size(); i++) {
323  if (local_recv[i]->len != local_send[i]->len) {
324  cerr << "Error: local recv & send have different len for i= "<<i<< " :" << local_recv[i]->len << " " << local_send[i]->len << endl;
325  }
326 #ifdef DEBUG_COMM
327  fprintf(stderr, " local: scheds no=%d start=%d len=%d\n", i,
328  sched_send[i].start, sched_send[i].len);
329 #endif
330  memcpy(local_recv[i]->base, local_send[i]->base, local_send[i]->len*gd.unit_size);
331  }
332 
333 
334  // Wait all receive & send
335 #ifdef DEBUG_INTERNAL
336  cerr << "WAITING local communications to end...\n";
337 #endif
338 
339  int err;
340  err = MPI_Waitall(si, sreq, sstat);
341  if (err!= MPI_SUCCESS) {
342  cerr << "EROR IN MPI_WaitAll for send: return value is "<<err<<endl;
343  }
344  err = MPI_Waitall(ri, rreq, rstat);
345  if (err!= MPI_SUCCESS) {
346  cerr << "EROR IN MPI_WaitAll for recv: return value is "<<err<<endl;
347  }
348 #ifdef DEBUG_INTERNAL
349  cerr << "WAITING local communications to end...ok \n";
350 #endif
351  }
352 }
353 
357 
static unsigned BlockNumberOfElementProc(const unsigned glen, const unsigned rank, const unsigned nbprocs, const unsigned bsz, const unsigned pos)
static unsigned NumberOfBlockProc(const unsigned glen, const unsigned nbprocs, const unsigned bsz, const unsigned rank)
vector< BasicBC > sd
Definition: debug.cc:14
void computeSendBlock1D(const GlobalData_t &gd, const LocalData_t &sd, const Topology_t &stopo, const Topology_t &dtopo, const ParisBlock_param_t *param, vector< LocalData_t > &vOut)
Definition: Schedule.cc:38
static unsigned OwnerBlock(const unsigned bid, const unsigned nbprocs)
static void computeBlockBounds(unsigned long *low, unsigned long *high, const unsigned glen, const unsigned rank, const unsigned nbprocs, const unsigned bsz, const unsigned pos)
static unsigned getProcId(unsigned rank, Topology_t topo)
Definition: Internal.h:11
static unsigned getProcRangeInf(unsigned low, unsigned bsz)
Definition: BasicBC.cc:23
void doSchedule(const GlobalData_t &gd, const LocalData_t &ld, const Topology_t &ctopo, vector< LocalData_t > &sched_send, vector< LocalData_t > &sched_recv, void *comm)
Definition: Schedule.cc:227
static unsigned blockSize(const unsigned glen, const unsigned nbprocs, const ParisBlock_param_t *param)
vector< BasicBC > dd
Definition: debug.cc:15
static unsigned getProcRangeSup(unsigned high, unsigned bsz)
Definition: BasicBC.cc:28
void computeReceiveBlock1D(const GlobalData_t &gd, const LocalData_t &dd, const Topology_t &stopo, const Topology_t &dtopo, const ParisBlock_param_t *param, vector< LocalData_t > &vOut)
Definition: Schedule.cc:135
bool cmp_rank(const LocalData_t &a, const LocalData_t &b)
Definition: Schedule.cc:27