PaCO++  0.05
PC/Schedule.cc
Go to the documentation of this file.
1 /* Padico Advanced Examples
2  * author: Christian Pérez
3  */
4 
5 #include <stdio.h>
6 #include <Padico/MPCircuit.h>
7 
8 #include "Schedule.h"
9 #include "Internal.h"
10 #include "DistributionBloc.h"
11 
12 //#define NO_COM
13 
14 #undef STANDALONE_FILE
15 
16 #define DEBUG_INTERNAL
17 #define DEBUG_COMM
18 
19 #ifdef NO_COM
20 inline char* mymalloc(unsigned sz) {
21  static unsigned base=0x1000000;
22  unsigned ret = base;
23  base+=0x1000;
24  return (char*) ret;
25 }
26 #endif
27 
28 #ifdef STANDALONE_FILE
29 #include "../Generated/Concret.h"
30 #include "../Generated/XServiceType.h"
31 #endif
32 
33 
34 /************************************************************/
35 /************************************************************/
36 /************************************************************/
37 
38 // vOut represents what localData/stopo have to send to nodes of dtopo (vOut[].rank is in dtopo space)
39 void computeSendBlock1D(const GlobalData_t& gd, const LocalData_t &sd,
40  const Topology_t &stopo, const Topology_t &dtopo,
41  vector<LocalData_t>& vOut) {
42 
43 #ifdef DEBUG_INTERNAL
44  cerr << "\nIn compute Send Schedule--------------------\n";
45 
46  fprintf(stderr, "stopo: %ld\tdtopo: %ld\n",stopo.total, dtopo.total);
47  fprintf(stderr, "gd.len %ld\tsd.start %d\tsd.len %d\n",gd.len, sd.start, sd.len);
48 #endif
49 
50  if (stopo.total == dtopo.total) {
51  // vOut.reserve(vOut.size()+dtopo.total); // in bloc mode, at most one msg to each dest node
52  vOut.push_back(sd);
53 #ifdef DEBUG_INTERNAL
54  fprintf(stderr, " rank:%d start:%d len:%d base:%p\n", sd.rank, sd.start, sd.len, sd.base);
55 #endif
56  } else {
57  // Append mode
58  vOut.reserve(vOut.size()+dtopo.total); // in bloc mode, at most one msg to each dest node
59 
60  unsigned dbsz = blockSize(gd.len, dtopo.total);
61 
62  unsigned long slow = sd.start;
63  unsigned long shigh = slow + sd.len;
64 
65  unsigned fpid, lpid;
66  fpid = getProcRangeInf(slow, dbsz);
67  lpid = getProcRangeSup(shigh, dbsz);
68 
69 #ifdef DEBUG_INTERNAL
70  fprintf(stderr, " loop from %d to %d width dtotal: %ld\n", fpid, lpid, dtopo.total);
71 #endif
72 
73  // for each dest bloc
74  for(unsigned i=fpid; i <= lpid; i++) {
75 
76  vOut.resize(vOut.size()+1);
77  LocalData_t& s = vOut[vOut.size()-1];
78 
79  s.rank = i;
80  unsigned tmp = i*dbsz;
81  s.start = ( slow >= tmp)?slow:tmp; // max
82 
83  tmp = (i+1)*dbsz;
84  unsigned end = ( shigh <= tmp)?shigh:tmp; // min
85 
86  s.len = end - s.start;
87 
88  s.base = sd.base + ((s.start - sd.start) * gd.unit_size);
89 
90 #ifdef DEBUG_INTERNAL
91  fprintf(stderr, " s: to:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base);
92 #endif
93  }
94  }
95 #ifdef DEBUG_INTERNAL
96  cerr << "\nIn compute Send Schedule-------------------- done\n";
97 #endif
98 }
99 
103 
104 // vOut represents what localData/dtopo have to receive from nodes of stopo (vOut[].rank is in stopo space)
105 void computeReceiveBlock1D(const GlobalData_t& gd, const LocalData_t &dd,
106  const Topology_t &stopo, const Topology_t &dtopo,
107  vector<LocalData_t>& vOut) {
108 
109 #ifdef DEBUG_INTERNAL
110  cerr << "\nIn compute Receive Schedule--------------------\n";
111 
112  fprintf(stderr, "stopo: %ld\tdtopo: %ld\n",stopo.total, dtopo.total);
113  fprintf(stderr, "gd.len %ld\tdd.start %d\tdd.len %d\n", gd.len, dd.start, dd.len);
114 
115 #endif
116 
117  if (stopo.total == dtopo.total) {
118  vOut.push_back(dd);
119 #ifdef DEBUG_INTERNAL
120  fprintf(stderr, " rank:%d start:%d len:%d base:%p\n", dd.rank, dd.start, dd.len, dd.base);
121 #endif
122  } else {
123  // Apend mode
124  vOut.reserve(vOut.size()+stopo.total); // in bloc mode, at most one msg from each src node
125 
126  unsigned sbsz = blockSize(gd.len, stopo.total);
127 
128  unsigned long dlow = dd.start;
129  unsigned long dhigh = dlow + dd.len;
130 
131  unsigned fpid, lpid;
132  fpid = getProcRangeInf(dlow, sbsz);
133  lpid = getProcRangeSup(dhigh, sbsz);
134 
135 #ifdef DEBUG_INTERNAL
136  fprintf(stderr, " loop from %d to %d width stotal: %ld\n", fpid, lpid, stopo.total);
137 #endif
138 
139  // for each dest bloc
140  for(unsigned i=fpid; i <= lpid; i++) {
141 
142  vOut.resize(vOut.size()+1);
143  LocalData_t& s = vOut[vOut.size()-1];
144 
145  s.rank = i;
146  unsigned tmp = i*sbsz;
147  s.start = ( dlow >= tmp)?dlow:tmp; // max
148 
149  tmp = (i+1)*sbsz;
150  unsigned end = ( dhigh <= tmp)?dhigh:tmp; // min
151 
152  s.len = end - s.start;
153 
154  s.base = dd.base + ((s.start - dd.start) * gd.unit_size);
155 
156 #ifdef DEBUG_INTERNAL
157  fprintf(stderr, " r: from:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base);
158 #endif
159  }
160  }
161 }
162 
163 /************************************************************/
164 /************************************************************/
165 /************************************************************/
166 
167 // ctopo is the current topo
168 // sdata: a pointer to the source data (pointer to local data)
169 // ddata: a pointer to the base address of of reveive (pointer to local data)
170 void doSchedule(const GlobalData_t& gd, const LocalData_t& ld, const Topology_t &ctopo,
171  vector<LocalData_t>& sched_send, vector<LocalData_t>& sched_recv, void* comm) {
172 
173  cerr << "\nIn doSchedule--------------------\n";
174 
175  padico_mpcircuit_t schd_mpc = (padico_mpcircuit_t) comm;
176 
177  if (sched_send.size() || sched_recv.size()) {
178 
179 #ifndef NO_COM
180  void* rreq[sched_recv.size()];
181  unsigned ri;
182  ri=0;
183 #endif
184 
185  vector<LocalData_t*> local_recv;
186  vector<LocalData_t*> local_send;
187 
188  local_recv.clear();
189  local_send.clear();
190 
192  // Sending data
193 
194  // Post Asynchronous MPCircuit receive
195 #ifdef DEBUG_COM
196  cerr << " #sched_recv: " << sched_recv.size() << endl;
197 #endif
198  for(unsigned i=0; i < sched_recv.size(); i++) {
199  unsigned from = getProcId(sched_recv[i].rank, ctopo);
200  if (from == ld.rank) {
201 #ifdef DEBUG_COMM
202  fprintf(stderr, " recv: schedr no=%d start=%d len=%d from=%d LOCAL\n", i,
203  sched_recv[i].start, sched_recv[i].len, from);
204 #endif
205  local_recv.push_back(&sched_recv[i]);
206  } else {
207 #ifdef DEBUG_COMM
208  fprintf(stderr, " recv: schedr no=%d start=%d len=%d from=%d base=%p\n", i,
209  sched_recv[i].start, sched_recv[i].len, from, sched_recv[i].base);
210 #endif
211 
212 #ifndef NO_COM
213  rreq[ri++] = padico_mpcircuit_Irecv(sched_recv[i].base, sched_recv[i].len*gd.unit_size,
214  from, 51, schd_mpc );
215 #endif
216  }
217  }
218 
219  // Send data via MPCircuit
220 #ifdef DEBUG_COMM
221  cerr << " #sched_send: " << sched_send.size() << endl;
222 #endif
223  for(unsigned i=0; i < sched_send.size(); i++) {
224  unsigned to = getProcId(sched_send[i].rank, ctopo);
225  if (to == ld.rank) {
226 #ifdef DEBUG_COMM
227  fprintf(stderr, " send: scheds no=%d start=%d len=%d to=%d LOCAL\n", i,
228  sched_send[i].start, sched_send[i].len, to);
229 #endif
230  local_send.push_back(&sched_send[i]);
231  } else {
232 #ifdef DEBUG_COMM
233  fprintf(stderr, " send: scheds no=%d start=%d len=%d to=%d base=%p\n", i,
234  sched_send[i].start, sched_send[i].len, to, sched_send[i].base);
235 #endif
236 
237 #ifndef NO_COM
238  padico_mpcircuit_send(sched_send[i].base, sched_send[i].len*gd.unit_size,
239  to, 51, schd_mpc);
240 #endif
241  }
242  }
243 
244  // Do local communication vie memcpy
245  if (local_recv.size() != local_send.size()) {
246  cerr << "Error: local recv & send have different size: " << local_recv.size() << " " << local_send.size() << endl;
247  }
248  for(unsigned i=0; i < local_recv.size(); i++) {
249  if (local_recv[i]->len != local_send[i]->len) {
250  cerr << "Error: local recv & send have different len for i= "<<i<< " :" << local_recv[i]->len << " " << local_send[i]->len << endl;
251  }
252 #ifdef DEBUG_COMM
253  fprintf(stderr, " local: scheds no=%d start=%d len=%d\n", i,
254  sched_send[i].start, sched_send[i].len);
255 #endif
256 #ifndef NO_COM
257  memcpy(local_recv[i]->base, local_send[i]->base, local_send[i]->len*gd.unit_size);
258 #endif
259  }
260 
261 
262  // Wait all receive & send
263 #ifndef NO_COM
264 #ifdef DEBUG_INTERNAL
265  cerr << "WAITING local communications to end...\n";
266 #endif
267 
268  padico_mpcircuit_waitAll(rreq, ri);
269 #ifdef DEBUG_INTERNAL
270  cerr << "WAITING local communications to end...ok \n";
271 #endif
272 #endif
273  }
274 }
275 
279 
280 #ifdef STANDALONE_FILE
281 int simSendDataBlock1D(unsigned int glen, int total, int rank, int dtotal, const PaCO::distLoc_t& mode) {
282 
283  GlobalData_t gd;
284  LocalData_t sd;
285  Topology_t stopo, dtopo;
286 
287  vConcret vdarray;
288  vector<unsigned> destid;
289 
290  gd.len = glen;
291  gd.unit_size = sizeof(xservice_data_t);
292 
293  unsigned bsz = blockSize(glen, total);
294 
295  sd.rank = rank;
296  sd.start = computeBlockBoundInf0(bsz, rank);
297  sd.len = localBlockLengthO(glen, rank, total, bsz);
298 #ifdef NO_COM
299  sd.base = (char*) 0x1000;
300 #else
301  sd.base = (char*) malloc(sd.len*gd.unit_size);
302  xservice_data_t* p=(xservice_data_t*) sd.base;
303  for( unsigned i=0; i < sd.len; i++) {
304  *p++ = sd.start+i;
305  }
306 #endif
307 
308 #if 0
309  cerr << "Dumping data: ";
310  for(unsigned k=0; k < sd.len; k++)
311  cerr << " " << ((xservice_data_t*)sd.base)[k];
312  cerr << endl;
313 #endif
314 
315  stopo.total = total;
316  dtopo.total = dtotal;
317 
318  computeSendDataBlock1D(gd, sd, stopo, dtopo, vdarray, destid, mode);
319 
322  // send data to remote nodes
323 
324  padico_mpcircuit_barrier(schd_mpc);
325 
326  cerr << "\n #vdarray: " << vdarray.size() << "\n";
327  for(unsigned i=0; i< vdarray.size(); i++) {
328  cout << "Dumping vdarray["<<i<<"] to " << destid[i] << " :\n";
329  cout << " topo / gd : " << vdarray[i]->topo().total << " / " << vdarray[i]->gd().len << " / " << vdarray[i]->gd().unit_size << endl;
330  cout << " dist # " << vdarray[i]->dist().length() << endl;
331  for(unsigned j=0; j< vdarray[i]->dist().length(); j++) {
332  cout << " rank/low/len (len): " << vdarray[i]->dist()[j].rank << " / " << vdarray[i]->dist()[j].start << " / " << vdarray[i]->dist()[j].len << " ( " << vdarray[i]->getDataLength(j) << " )" <<endl;
333  }
334 
335 #ifndef NO_COM
336  // cerr << " data: "; fprintf(stderr, "%p:", &vdarray[i].Data(j, 0);
337  // for(unsigned k=0; k < vdarray[i].getDataLength(j); k++)
338  // cerr << " " << vdarray[i].Data(j, k);
339 #endif
340  cerr << endl;
341 
342  // pglobal->xserviceList[destid[i]]->sendData(vdarray[i]);
343 
344  }
345  return 0;
346 }
347 
348 int main(int argc, char** argv) {
349 
350  MPI_Init(&argc, &argv);
351 
352  if (argc != 4) {
353  fprintf(stderr,"Usage: %s len source_total dest_total_nodes\n", argv[0]);
354  exit(1);
355  }
356 
357  unsigned glen, stotal, dtotal;
358 
359  sscanf(argv[1],"%d",&glen);
360  sscanf(argv[2],"%d",&stotal);
361  sscanf(argv[3],"%d",&dtotal);
362 
363  fprintf(stderr,"Distribution block 1D with len=%d stotal=%d dtotal=%d\n", glen, stotal, dtotal);
364 
365  int rank;
366  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
367  pid=rank;
368 
369  // for(unsigned rank=0; rank< stotal; rank++) {
370  cerr << endl << "------------------------------- " << rank << " / " << stotal << endl;
371  simSendDataBlock1D(glen, stotal, rank, dtotal, PaCO::ClientSide);
372  // }
373 
374  cerr << pid << ": ENDING !!\n";
375 
376  MPI_Finalize();
377 }
378 #endif
vector< BasicBC > sd
Definition: debug.cc:14
void computeSendBlock1D(const GlobalData_t &gd, const LocalData_t &sd, const Topology_t &stopo, const Topology_t &dtopo, vector< LocalData_t > &vOut)
Definition: PC/Schedule.cc:39
static unsigned getProcId(unsigned rank, Topology_t topo)
Definition: Internal.h:11
void doSchedule(const GlobalData_t &gd, const LocalData_t &ld, const Topology_t &ctopo, vector< LocalData_t > &sched_send, vector< LocalData_t > &sched_recv, void *comm)
Definition: PC/Schedule.cc:170
distLoc_t
Definition: PaCO++.idl:12
void computeSendDataBlock1D(const GlobalData_t &gd, const LocalData_t &sd, const Topology_t &stopo, const Topology_t &dtopo, const ParisBlock_param_t *param, vAbstrait &vdarray, vector< unsigned > &destid, const PaCO::distLoc_t &mode, void *comm)
Definition: ClientSide.cc:215
static unsigned getProcRangeInf(unsigned low, unsigned bsz)
Definition: BasicBC.cc:23
static unsigned computeBlockBoundInf0(unsigned bsz, unsigned rank)
int main(int argc, char *argv[])
Definition: debug.cc:28
static unsigned blockSize(const unsigned glen, const unsigned nbprocs, const ParisBlock_param_t *param)
vector< BasicBC > dd
Definition: debug.cc:15
static unsigned localBlockLengthO(unsigned glen, unsigned rank, unsigned total, unsigned bsz)
static unsigned getProcRangeSup(unsigned high, unsigned bsz)
Definition: BasicBC.cc:28
void computeReceiveBlock1D(const GlobalData_t &gd, const LocalData_t &dd, const Topology_t &stopo, const Topology_t &dtopo, vector< LocalData_t > &vOut)
Definition: PC/Schedule.cc:105