/[escript]/trunk/paso/src/CommBuffer.c
ViewVC logotype

Contents of /trunk/paso/src/CommBuffer.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 790 - (show annotations)
Wed Jul 26 23:12:34 2006 UTC (12 years, 11 months ago) by bcumming
File MIME type: text/plain
File size: 15850 byte(s)
changes to escript/py_src/pdetools.py and /escript/src/Data.h/.cpp to
make the Locator work in MPI. escript::Data::mindp now returns a 3 tuple,
with the MPI rank of the process on which the minimum value occurs
included. escript::Data::convertToNumArrayFromDPNo also takes the ProcNo
to perform the MPI reduction.

This had to be implemented in both the MPI and non-MPI versions to allow
the necesary changes to the Python code in pdetools.py. In the non-MPI
version ProcNo is set to 0. This works for the explicit scripts tested
thus far, however if it causes problems in your scripts contact Ben or
Lutz, or revert the three files (pdetools.py, Data.h and Data.cpp) to
the previous version.  


1 /*******************************************************************
2
3 a buffer type for communication of "boundary" data between domains.
4 send and receive buffers are allocated for each neighbouring domain,
5 and can be used for sending any type of data distributed over the domains.
6 Looks after communication completion, and safeguards against message collision
7 etc.
8
9 *******************************************************************/
10
11 #include "Paso.h"
12
13 #ifdef PASO_MPI
14 #include <string.h>
15
16 #include "CommBuffer.h"
17
18 static index_t Paso_CommBuffer_checkDomain( Paso_CommBuffer *in, index_t dom )
19 {
20 index_t position;
21
22 if( !in || !in->numDomains )
23 {
24 Paso_setError( VALUE_ERROR, "Paso_CommBuffer_checkDomain() : invalid or improperly uninitialised CommBuffer");
25 return -1;
26 }
27 if( dom<0 || dom>(in->MPIInfo->size-1) )
28 {
29 Paso_setError( VALUE_ERROR, "Paso_CommBuffer_checkDomain() : domain not in the communicator");
30 return -1;
31 }
32 position = in->indexDomains[dom];
33 if( position<0 )
34 {
35 Paso_setError( VALUE_ERROR,"Paso_CommBuffer_checkDomain() : domain not a neighbour of this domain" );
36 return -1;
37 }
38
39 return position;
40 }
41
42 /***************************************
43 * alloc/dealloc for CommBuffer type
44 ***************************************/
45 Paso_CommBuffer *Paso_CommBuffer_alloc( Paso_MPIInfo *MPIInfo, index_t tag )
46 {
47 Paso_CommBuffer *out=NULL;
48 index_t i;
49
50 out = MEMALLOC( 1, Paso_CommBuffer );
51
52 if( Paso_checkPtr(out) )
53 return NULL;
54
55 out->reference_counter = 0;
56
57 out->MPIInfo = Paso_MPIInfo_getReference( MPIInfo );
58 if( !Paso_noError() )
59 {
60 MEMFREE(out);
61 return NULL;
62 }
63
64 out->bufferForward = out->bufferBackward = NULL;
65 out->numForward = out->numBackward = NULL;
66 out->domains = NULL;
67 out->indexDomains = MEMALLOC( MPIInfo->size, index_t );
68 for( i=0; i<MPIInfo->size; i++ )
69 out->indexDomains[i] = -1;
70 out->statusForward = NULL;
71 out->requestForward = NULL;
72 out->statusBackward = NULL;
73 out->requestBackward = NULL;
74 out->requestedRecvLength = NULL;
75 out->tag = tag;
76 out->numDomains = 0;
77 out->maxItemSize = 0;
78 out->reference_counter++;
79
80 return out;
81 }
82
83 void Paso_CommBuffer_dealloc( Paso_CommBuffer *in )
84 {
85 if( in && !(--in->reference_counter) )
86 {
87 index_t i;
88
89 if( in->bufferForward )
90 for( i=0; i<in->numDomains; i++ ) {
91 MEMFREE( in->bufferForward[i] );
92 }
93 MEMFREE( in->bufferForward );
94
95 if( in->bufferBackward )
96 for( i=0; i<in->numDomains; i++ ) {
97 MEMFREE( in->bufferBackward[i] );
98 }
99 MEMFREE( in->bufferBackward );
100 MEMFREE( in->requestedRecvLength);
101 MEMFREE( in->numForward );
102 MEMFREE( in->numBackward );
103 MEMFREE( in->domains );
104 MEMFREE( in->indexDomains );
105 MEMFREE( in->statusForward );
106 MEMFREE( in->requestForward );
107 MEMFREE( in->statusBackward );
108 MEMFREE( in->requestBackward );
109
110 Paso_MPIInfo_dealloc( in->MPIInfo );
111
112 MEMFREE( in );
113 }
114 }
115
116 Paso_CommBuffer *Paso_CommBuffer_getReference( Paso_CommBuffer *in )
117 {
118 if( !in )
119 return NULL;
120
121 in->reference_counter++;
122
123 return in;
124 }
125
126 void Paso_CommBuffer_allocTable( Paso_CommBuffer *in, size_t itemSize, index_t *numForward, index_t *numBackward, dim_t numDomains, index_t *domains )
127 {
128 index_t i;
129
130 if( in && ( !in->domains || itemSize>in->maxItemSize ) )
131 {
132 /* Setup the domain info if it has not already been done.
133 In effect, this says that the domain info is fixed for the lifespan of the
134 CommBuffer. For example, one cannot reconfigure the structure for DOF
135 data to element data. */
136 if( !in->domains )
137 {
138 if( numDomains>0 )
139 {
140 in->numForward = MEMALLOC( numDomains, index_t );
141 in->numBackward = MEMALLOC( numDomains, index_t );
142 in->domains = MEMALLOC( numDomains, index_t );
143 in->requestedRecvLength= MEMALLOC( numDomains, dim_t);
144 in->requestForward = MEMALLOC( numDomains, MPI_Request );
145 in->statusForward = MEMALLOC( numDomains, MPI_Status );
146 in->requestBackward = MEMALLOC( numDomains, MPI_Request );
147 in->statusBackward = MEMALLOC( numDomains, MPI_Status );
148
149 if( ( Paso_checkPtr(in->numForward) || Paso_checkPtr(in->requestedRecvLength) || Paso_checkPtr(in->numBackward) || Paso_checkPtr(in->domains) || Paso_checkPtr(in->requestForward) || Paso_checkPtr(in->statusForward) ) )
150 {
151 MEMFREE( in->requestedRecvLength );
152 MEMFREE( in->numForward );
153 MEMFREE( in->numBackward );
154 MEMFREE( in->domains );
155 MEMFREE( in->requestForward );
156 MEMFREE( in->statusForward );
157 MEMFREE( in->requestBackward );
158 MEMFREE( in->statusBackward );
159 return;
160 }
161
162 memcpy( in->numForward, numForward, numDomains*sizeof(index_t) );
163 memcpy( in->numBackward, numBackward, numDomains*sizeof(index_t) );
164 memcpy( in->domains, domains, numDomains*sizeof(index_t) );
165 for( i=0; i<numDomains; i++ )
166 in->requestForward[i] = in->requestBackward[i] = MPI_REQUEST_NULL;
167
168 for( i=0; i<numDomains; i++ )
169 in->requestedRecvLength[i] =0;
170
171 for( i=0; i<numDomains; i++ )
172 in->indexDomains[domains[i]] = i;
173
174 }
175 in->numDomains = numDomains;
176 }
177
178 /* setup the buffers, which may need to be reallocated in the case where
179 the maximum itemSize has been reset */
180 if( in->numDomains>0 )
181 {
182 if( !in->bufferForward )
183 {
184 in->bufferForward = MEMALLOC( in->numDomains, void* );
185 for( i=0; i<in->numDomains; i++ )
186 in->bufferForward[i] = NULL;
187 }
188 for( i=0; i<in->numDomains; i++ )
189 if( in->numForward[i]>0 && itemSize>0 )
190 {
191 MEMREALLOC( in->bufferForward[i], in->numForward[i]*itemSize, char );
192 }
193
194 if( !in->bufferBackward )
195 {
196 in->bufferBackward = MEMALLOC( numDomains, void* );
197 for( i=0; i<in->numDomains; i++ )
198 in->bufferBackward[i] = NULL;
199 }
200 for( i=0; i<in->numDomains; i++ )
201 if( in->numBackward[i]>0 && itemSize>0 )
202 {
203 MEMREALLOC( in->bufferBackward[i], in->numBackward[i]*itemSize, char );
204 }
205
206 in->maxItemSize = itemSize;
207 }
208 }
209 }
210
211 bool_t Paso_CommBuffer_recv( Paso_CommBuffer *in, index_t dom, size_t itemSize )
212 {
213 index_t position=0;
214 int result;
215 MPI_Status status;
216 MPI_Request request;
217
218 if( (position=Paso_CommBuffer_checkDomain( in, dom )) == -1 )
219 return FALSE;
220
221 /* ensure that the buffers are large enough */
222 if( itemSize>in->maxItemSize )
223 Paso_CommBuffer_allocTable( in, itemSize, NULL, NULL, -1, NULL );
224
225 /* check that there isn't already a pending receive on the buffer */
226 if( in->requestBackward[position]!=MPI_REQUEST_NULL )
227 {
228 Paso_setError( PASO_MPI_ERROR, "Paso_CommBuffer_recv() : cannot request a receive in a buffer that has an unresolved request" );
229 return FALSE;
230 }
231
232 result = MPI_Irecv( in->bufferBackward[position], in->numBackward[position]*itemSize, MPI_BYTE, dom, in->tag, in->MPIInfo->comm, in->requestBackward+position );
233
234 if( result!=MPI_SUCCESS )
235 return FALSE;
236
237 /* set the size of the requested receive for error checking when testing if the receive completed */
238 in->requestedRecvLength[position] = itemSize*in->numBackward[position];
239
240 return TRUE;
241 }
242
243 /* Assumes that there is/will be a message sent */
244 bool_t Paso_CommBuffer_recvAny( Paso_CommBuffer *in, index_t *dom, size_t itemSize )
245 {
246 int result;
247 MPI_Status status;
248
249 /* ensure that the buffers are large enough */
250 if( itemSize>in->maxItemSize )
251 Paso_CommBuffer_allocTable( in, itemSize, NULL, NULL, -1, NULL );
252
253
254 /* probe for a message. Note, this is a blocking call */
255 MPI_Probe( MPI_ANY_SOURCE, in->tag, in->MPIInfo->comm, &status);
256 *dom = status.MPI_SOURCE;
257
258 return Paso_CommBuffer_recv( in, *dom, itemSize );
259 }
260
261 bool_t Paso_CommBuffer_send( Paso_CommBuffer *in, index_t dom, size_t itemSize )
262 {
263 index_t position=0;
264 int result, i;
265
266 if( (position = Paso_CommBuffer_checkDomain( in, dom ))==-1 )
267 return FALSE;
268
269 /* check to make sure that the send buffer doesn't already have a pending send */
270 if( Paso_CommBuffer_waitSend( in, dom )==FALSE )
271 return FALSE;
272
273 /* is the buffer large enough? if so, it is too late to resize it
274 and we have to flag an error. */
275 if( itemSize>in->maxItemSize )
276 {
277 Paso_setError( VALUE_ERROR, "Paso_CommBuffer_send() : the requested itemSize exceeds in->maxItemSize" );
278 return FALSE;
279 }
280
281 /* send the buffer */
282 result = MPI_Isend( in->bufferForward[position], itemSize*in->numForward[position], MPI_BYTE, dom, in->tag, in->MPIInfo->comm, &in->requestForward[position] );
283
284 if( result!=MPI_SUCCESS )
285 return FALSE;
286 return TRUE;
287 }
288
289 /*
290 waits for all pending sends in the buffer to complete.
291 returns immediately if an error is encountered.
292 */
293 bool_t Paso_CommBuffer_waitSendPending( Paso_CommBuffer *in )
294 {
295 index_t i;
296 int success;
297
298 for( i=0; i<in->numDomains; i++ )
299 if( Paso_CommBuffer_waitSend( in, in->domains[i] )==FALSE )
300 return FALSE;
301
302 return TRUE;
303 }
304
305 /*
306 waits for all a pending send in the buffer before continueing
307 */
308 bool_t Paso_CommBuffer_waitSend( Paso_CommBuffer *in, index_t dom )
309 {
310 index_t position;
311 int success;
312
313 if( (position = Paso_CommBuffer_checkDomain( in, dom ))==-1 )
314 return FALSE;
315
316 if( in->requestForward[position]!=MPI_REQUEST_NULL )
317 {
318 success = MPI_Wait( &in->requestForward[position], &in->statusForward[position] );
319
320 if( success!=MPI_SUCCESS )
321 {
322 Paso_setError( PASO_MPI_ERROR, "Paso_CommBuffer_waitSend() : failed MPI_Isend" );
323 return FALSE;
324 }
325 }
326
327 return TRUE;
328 }
329
330 /*
331 waits for all pending receives in the buffer to complete.
332 returns immediately if an error is encountered.
333 */
334 bool_t Paso_CommBuffer_waitRecvPending( Paso_CommBuffer *in )
335 {
336 index_t i;
337 int success;
338
339 for( i=0; i<in->numDomains; i++ )
340 if( Paso_CommBuffer_waitRecv( in, in->domains[i] )==FALSE )
341 return FALSE;
342
343 return TRUE;
344 }
345
346 /*
347 waits for all a pending receives in the buffer before continueing
348 */
349 bool_t Paso_CommBuffer_waitRecv( Paso_CommBuffer *in, index_t dom )
350 {
351 index_t position;
352 int success;
353
354 if( (position = Paso_CommBuffer_checkDomain( in, dom ))==-1 )
355 return FALSE;
356
357 if( in->requestBackward[position]!=MPI_REQUEST_NULL )
358 {
359 success = MPI_Wait( &in->requestBackward[position], &in->statusBackward[position] );
360
361 if( success!=MPI_SUCCESS )
362 {
363 Paso_setError( PASO_MPI_ERROR, "Paso_CommBuffer_waitRecv() : failed MPI_Irecv" );
364 return FALSE;
365 }
366
367 /* verify that the received buffer was the length of the requested buffer */
368 MPI_Get_count( in->statusBackward + position, MPI_BYTE, &success );
369 if( success!=in->requestedRecvLength[position] )
370 {
371 Paso_setError( PASO_MPI_ERROR, "Paso_CommBuffer_waitRecv() : size of received buffer and backward count are not equal" );
372 return FALSE;
373 }
374 }
375
376 return TRUE;
377 }
378
379
380 /*
381 pack information into a send buffer
382 */
383 void Paso_CommBuffer_pack( Paso_CommBuffer *in, index_t dom, index_t *index, void *data, size_t itemSize, dim_t offset )
384 {
385 long i;
386 index_t position;
387 unsigned char *from, *to;
388
389 /* wait for pending sends from the buffer */
390 if( Paso_CommBuffer_waitSend( in, dom )==FALSE )
391 return;
392
393 /* ensure that the buffers are large enough */
394 if( itemSize>in->maxItemSize )
395 Paso_CommBuffer_allocTable( in, itemSize, NULL, NULL, -1, NULL );
396
397 /* setup pointers to regions for copying */
398 if( (position = Paso_CommBuffer_checkDomain( in, dom ))==-1 )
399 return;
400 if( in->numForward[position]==0 )
401 return;
402 from = (unsigned char*)data;
403 to = (unsigned char*)in->bufferForward[position];
404 from += offset*itemSize;
405
406 if( index==NULL && in->numForward[position]>0 )
407 {
408 /* if there is no index, pack the data as is */
409 memcpy( to, from, itemSize*in->numForward[position] );
410 }
411 else
412 {
413 /* pack the data according to index */
414 for( i=0; i<in->numForward[position]; i++, to+=itemSize )
415 memcpy( to , from + (index[i]*itemSize), itemSize );
416 }
417 }
418
419 void Paso_CommBuffer_unpack( Paso_CommBuffer *in, index_t dom, index_t *index, void *data, size_t itemSize, dim_t offset )
420 {
421 long i;
422 index_t position;
423 unsigned char *from, *to;
424
425 /* wait for pending receives to the buffer */
426 if( Paso_CommBuffer_waitRecv( in, dom )==FALSE )
427 return;
428
429 /* ensure that the buffers are large enough */
430 if( itemSize>in->maxItemSize )
431 Paso_CommBuffer_allocTable( in, itemSize, NULL, NULL, -1, NULL );
432
433 /* setup pointers to regions for copying */
434 if( (position = Paso_CommBuffer_checkDomain( in, dom ))==-1 )
435 return;
436 if( in->numBackward[position]==0 )
437 return;
438
439 from = (unsigned char*)in->bufferBackward[position];
440 to = (unsigned char*)data;
441 to += offset*itemSize;
442
443 if( index==NULL && in->numBackward[position]>0 )
444 {
445 /* if there is no index, unpack the data as is */
446 memcpy( to, from, itemSize*in->numBackward[position] );
447 }
448 else
449 {
450 /* unpack the data according to index */
451 for( i=0; i<in->numBackward[position]; i++ )
452 memcpy( to + itemSize*index[i], from + (i*itemSize), itemSize );
453 }
454 }
455
456 /*
457 verify that the information stored accross the processors on the communicator
458 over which the CommBuffer is allocated is valid
459 */
460 bool_t Paso_CommBuffer_validate( Paso_CommBuffer *in )
461 {
462 index_t *tmpForwardMap=NULL, *tmpBackwardMap=NULL, *tmpConnectionMap=NULL, *tmpForward=NULL, *tmpBackward=NULL, *tmpConnection=NULL;
463 dim_t size, rank, i;
464
465 if( in && in->MPIInfo->size>0 )
466 {
467 size = in->MPIInfo->size;
468 rank = in->MPIInfo->rank;
469
470 tmpForwardMap = MEMALLOC( size*size, index_t );
471 tmpBackwardMap = MEMALLOC( size*size, index_t );
472 tmpConnectionMap = MEMALLOC( size*size, index_t );
473 tmpForward = MEMALLOC( size, index_t );
474 tmpBackward = MEMALLOC( size, index_t );
475 tmpConnection = MEMALLOC( size, index_t );
476
477 /* collect global image of the distribution */
478 for( i=0; i<size; i++ )
479 tmpConnection[i] = 0;
480 for( i=0; i<in->numDomains; i++ )
481 tmpConnection[in->domains[i]] = 1;
482 MPI_Allgather( tmpConnection, size, MPI_INT, tmpConnectionMap, size, MPI_INT, in->MPIInfo->comm );
483
484 for( i=0; i<size; i++ )
485 tmpForward[i] = 0;
486 for( i=0; i<in->numDomains; i++ )
487 tmpForward[in->domains[i]] = in->numForward[i];
488 MPI_Allgather( tmpForward, size, MPI_INT, tmpForwardMap, size, MPI_INT, in->MPIInfo->comm );
489
490
491 for( i=0; i<size; i++ )
492 tmpBackward[i] = 0;
493 for( i=0; i<in->numDomains; i++ )
494 tmpBackward[in->domains[i]] = in->numBackward[i];
495 MPI_Allgather( tmpBackward, size, MPI_INT, tmpBackwardMap, size, MPI_INT, in->MPIInfo->comm );
496
497 /* verify that information on different processors is consisent */
498 for( i=0; i<size; i++ )
499 {
500 if( tmpConnection[i] != tmpConnectionMap[rank+i*size] )
501 {
502 Paso_setError( VALUE_ERROR, "Paso_CommBuffer_validate() : neighbour connection map is inconsistent" );
503 goto clean;
504 }
505 }
506 for( i=0; i<size; i++ )
507 {
508 if( tmpForward[i] != tmpBackwardMap[rank+i*size] )
509 {
510 Paso_setError( VALUE_ERROR, "Paso_CommBuffer_validate() : neighbour forward map is inconsistent" );
511 goto clean;
512 }
513 }
514 for( i=0; i<size; i++ )
515 {
516 if( tmpBackward[i] != tmpForwardMap[rank+i*size] )
517 {
518 Paso_setError( VALUE_ERROR, "Paso_CommBuffer_validate() : neighbour backward map is inconsistent" );
519 goto clean;
520 }
521 }
522
523 }
524
525 clean :
526 MEMFREE( tmpBackwardMap );
527 MEMFREE( tmpForwardMap );
528 MEMFREE( tmpConnectionMap );
529 MEMFREE( tmpConnection );
530 MEMFREE( tmpForward );
531 MEMFREE( tmpBackward );
532
533 return Paso_MPI_noError( in->MPIInfo );
534 }
535
536 #endif

  ViewVC Help
Powered by ViewVC 1.1.26