/[escript]/trunk/paso/src/CommBuffer.c
ViewVC logotype

Contents of /trunk/paso/src/CommBuffer.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 751 - (show annotations)
Mon Jun 26 01:46:34 2006 UTC (13 years, 2 months ago) by bcumming
File MIME type: text/plain
File size: 16425 byte(s)
Changes relating to the MPI version of escript
The standard OpenMP version of escript is unchanged

- updated data types (Finley_Mesh, Finley_NodeFile, etc) to store meshes
  over multiple MPI processes.
- added CommBuffer code in Paso for communication of Data associated
  with distributed meshes
- updates in Finley and Escript to support distributed data and operations
  on distributed data (such as interpolation).
- construction of RHS in MPI, so that simple explicit schemes (such as
  /docs/examples/wave.py without IO and the Locator) can run in MPI.
- updated mesh generation for first order line, rectangle and brick
  meshes and second order line meshes in MPI.        
- small changes to trunk/SConstruct and trunk/scons/ess_options.py to
  build the MPI version, these changes are turned off by default.

1 /*******************************************************************
2
3 a buffer type for communication of "boundary" data between domains.
4 send and receive buffers are allocated for each neighbouring domain,
5 and can be used for sending any type of data distributed over the domains.
6 Looks after communication completion, and safeguards against message collision
7 etc.
8
9 *******************************************************************/
10
11 #include "Paso.h"
12
13 #ifdef PASO_MPI
14 #include <string.h>
15
16 #include "CommBuffer.h"
17
18 static index_t Paso_CommBuffer_checkDomain( Paso_CommBuffer *in, index_t dom )
19 {
20 index_t position;
21
22 if( !in || !in->numDomains )
23 {
24 Paso_setError( VALUE_ERROR, "Paso_CommBuffer_checkDomain() : invalid or improperly uninitialised CommBuffer");
25 return -1;
26 }
27 if( dom<0 || dom>(in->MPIInfo->size-1) )
28 {
29 Paso_setError( VALUE_ERROR, "Paso_CommBuffer_checkDomain() : domain not in the communicator");
30 return -1;
31 }
32 position = in->indexDomains[dom];
33 if( position<0 )
34 {
35 Paso_setError( VALUE_ERROR,"Paso_CommBuffer_checkDomain() : domain not a neighbour of this domain" );
36 return -1;
37 }
38
39 return position;
40 }
41
42 /***************************************
43 * alloc/dealloc for CommBuffer type
44 ***************************************/
45 Paso_CommBuffer *Paso_CommBuffer_alloc( Paso_MPIInfo *MPIInfo, index_t tag )
46 {
47 Paso_CommBuffer *out=NULL;
48 index_t i;
49
50 out = MEMALLOC( 1, Paso_CommBuffer );
51
52 if( Paso_checkPtr(out) )
53 return NULL;
54
55 out->reference_counter = 0;
56
57 out->MPIInfo = Paso_MPIInfo_getReference( MPIInfo );
58 if( !Paso_noError() )
59 {
60 MEMFREE(out);
61 return NULL;
62 }
63
64 out->bufferForward = out->bufferBackward = NULL;
65 out->numForward = out->numBackward = NULL;
66 out->domains = NULL;
67 out->indexDomains = MEMALLOC( MPIInfo->size, index_t );
68 for( i=0; i<MPIInfo->size; i++ )
69 out->indexDomains[i] = -1;
70 out->statusForward = NULL;
71 out->requestForward = NULL;
72 out->statusBackward = NULL;
73 out->requestBackward = NULL;
74 out->requestedRecvLength = NULL;
75 out->tag = tag;
76 out->numDomains = 0;
77 out->maxItemSize = 0;
78 out->reference_counter++;
79
80 return out;
81 }
82
83 void Paso_CommBuffer_dealloc( Paso_CommBuffer *in )
84 {
85 if( in && !(--in->reference_counter) )
86 {
87 index_t i;
88
89 if( in->bufferForward )
90 for( i=0; i<in->numDomains; i++ ) {
91 MEMFREE( in->bufferForward[i] );
92 }
93 MEMFREE( in->bufferForward );
94
95 if( in->bufferBackward )
96 for( i=0; i<in->numDomains; i++ ) {
97 MEMFREE( in->bufferBackward[i] );
98 }
99 MEMFREE( in->bufferBackward );
100 MEMFREE( in->requestedRecvLength);
101 MEMFREE( in->numForward );
102 MEMFREE( in->numBackward );
103 MEMFREE( in->domains );
104 MEMFREE( in->indexDomains );
105 MEMFREE( in->statusForward );
106 MEMFREE( in->requestForward );
107 MEMFREE( in->statusBackward );
108 MEMFREE( in->requestBackward );
109
110 Paso_MPIInfo_dealloc( in->MPIInfo );
111
112 MEMFREE( in );
113 }
114 }
115
116 Paso_CommBuffer *Paso_CommBuffer_getReference( Paso_CommBuffer *in )
117 {
118 if( !in )
119 return NULL;
120
121 in->reference_counter++;
122
123 return in;
124 }
125
126 void Paso_CommBuffer_allocTable( Paso_CommBuffer *in, size_t itemSize, index_t *numForward, index_t *numBackward, dim_t numDomains, index_t *domains )
127 {
128 index_t i;
129
130 if( in && ( !in->domains || itemSize>in->maxItemSize ) )
131 {
132 /* Setup the domain info if it has not already been done.
133 In effect, this says that the domain info is fixed for the lifespan of the
134 CommBuffer. For example, one cannot reconfigure the structure for DOF
135 data to element data. */
136 if( !in->domains )
137 {
138 if( numDomains>0 )
139 {
140 in->numForward = MEMALLOC( numDomains, index_t );
141 in->numBackward = MEMALLOC( numDomains, index_t );
142 in->domains = MEMALLOC( numDomains, index_t );
143 in->requestedRecvLength= MEMALLOC( numDomains, dim_t);
144 in->requestForward = MEMALLOC( numDomains, MPI_Request );
145 in->statusForward = MEMALLOC( numDomains, MPI_Status );
146 in->requestBackward = MEMALLOC( numDomains, MPI_Request );
147 in->statusBackward = MEMALLOC( numDomains, MPI_Status );
148
149 if( ( Paso_checkPtr(in->numForward) || Paso_checkPtr(in->requestedRecvLength) || Paso_checkPtr(in->numBackward) || Paso_checkPtr(in->domains) || Paso_checkPtr(in->requestForward) || Paso_checkPtr(in->statusForward) ) )
150 {
151 MEMFREE( in->requestedRecvLength );
152 MEMFREE( in->numForward );
153 MEMFREE( in->numBackward );
154 MEMFREE( in->domains );
155 MEMFREE( in->requestForward );
156 MEMFREE( in->statusForward );
157 MEMFREE( in->requestBackward );
158 MEMFREE( in->statusBackward );
159 return;
160 }
161
162 memcpy( in->numForward, numForward, numDomains*sizeof(index_t) );
163 memcpy( in->numBackward, numBackward, numDomains*sizeof(index_t) );
164 memcpy( in->domains, domains, numDomains*sizeof(index_t) );
165 for( i=0; i<numDomains; i++ )
166 in->requestForward[i] = in->requestBackward[i] = MPI_REQUEST_NULL;
167
168 for( i=0; i<numDomains; i++ )
169 in->requestedRecvLength[i] =0;
170
171 for( i=0; i<numDomains; i++ )
172 in->indexDomains[domains[i]] = i;
173
174 }
175 in->numDomains = numDomains;
176 }
177
178 /* setup the buffers, which may need to be reallocated in the case where
179 the maximum itemSize has been reset */
180 if( in->numDomains>0 )
181 {
182 if( !in->bufferForward )
183 {
184 in->bufferForward = MEMALLOC( in->numDomains, void* );
185 for( i=0; i<in->numDomains; i++ )
186 in->bufferForward[i] = NULL;
187 }
188 for( i=0; i<in->numDomains; i++ )
189 if( in->numForward[i]>0 && itemSize>0 )
190 {
191 MEMREALLOC( in->bufferForward[i], in->numForward[i]*itemSize, char );
192 }
193
194 if( !in->bufferBackward )
195 {
196 in->bufferBackward = MEMALLOC( numDomains, void* );
197 for( i=0; i<in->numDomains; i++ )
198 in->bufferBackward[i] = NULL;
199 }
200 for( i=0; i<in->numDomains; i++ )
201 if( in->numBackward[i]>0 && itemSize>0 )
202 {
203 MEMREALLOC( in->bufferBackward[i], in->numBackward[i]*itemSize, char );
204 }
205
206 in->maxItemSize = itemSize;
207 }
208 }
209 }
210
211 bool_t Paso_CommBuffer_recv( Paso_CommBuffer *in, index_t dom, size_t itemSize )
212 {
213 index_t position=0;
214 int result;
215 MPI_Status status;
216 MPI_Request request;
217
218 if( (position=Paso_CommBuffer_checkDomain( in, dom )) == -1 )
219 return FALSE;
220
221 /* ensure that the buffers are large enough */
222 if( itemSize>in->maxItemSize )
223 Paso_CommBuffer_allocTable( in, itemSize, NULL, NULL, -1, NULL );
224
225 /* check that there isn't already a pending receive on the buffer */
226 if( in->requestBackward[position]!=MPI_REQUEST_NULL )
227 {
228 Paso_setError( PASO_MPI_ERROR, "Paso_CommBuffer_recv() : cannot request a receive in a buffer that has an unresolved request" );
229 return FALSE;
230 }
231
232 result = MPI_Irecv( in->bufferBackward[position], in->numBackward[position]*itemSize, MPI_BYTE, dom, in->tag, in->MPIInfo->comm, in->requestBackward+position );
233
234 if( result!=MPI_SUCCESS )
235 return FALSE;
236
237 /* set the size of the requested receive for error checking when testing if the receive completed */
238 in->requestedRecvLength[position] = itemSize*in->numBackward[position];
239
240 return TRUE;
241 }
242
243 /* Assumes that there is/will be a message sent */
244 bool_t Paso_CommBuffer_recvAny( Paso_CommBuffer *in, index_t *dom, size_t itemSize )
245 {
246 int result;
247 MPI_Status status;
248
249 /* ensure that the buffers are large enough */
250 if( itemSize>in->maxItemSize )
251 Paso_CommBuffer_allocTable( in, itemSize, NULL, NULL, -1, NULL );
252
253
254 /* probe for a message. Note, this is a blocking call */
255 MPI_Probe( MPI_ANY_SOURCE, in->tag, in->MPIInfo->comm, &status);
256 *dom = status.MPI_SOURCE;
257
258 return Paso_CommBuffer_recv( in, *dom, itemSize );
259 }
260
261 bool_t Paso_CommBuffer_send( Paso_CommBuffer *in, index_t dom, size_t itemSize )
262 {
263 index_t position=0;
264 int result, i;
265
266 if( (position = Paso_CommBuffer_checkDomain( in, dom ))==-1 )
267 return FALSE;
268
269 /* check to make sure that the send buffer doesn't already have a pending send */
270 if( Paso_CommBuffer_waitSend( in, dom )==FALSE )
271 return FALSE;
272
273 /* is the buffer large enough? if so, it is too late to resize it
274 and we have to flag an error. */
275 if( itemSize>in->maxItemSize )
276 {
277 Paso_setError( VALUE_ERROR, "Paso_CommBuffer_send() : the requested itemSize exceeds in->maxItemSize" );
278 return FALSE;
279 }
280
281 /* send the buffer */
282 result = MPI_Isend( in->bufferForward[position], itemSize*in->numForward[position], MPI_BYTE, dom, in->tag, in->MPIInfo->comm, &in->requestForward[position] );
283
284 if( result!=MPI_SUCCESS )
285 return FALSE;
286 return TRUE;
287 }
288
289 /*
290 waits for all pending sends in the buffer to complete.
291 returns immediately if an error is encountered.
292 */
293 bool_t Paso_CommBuffer_waitSendPending( Paso_CommBuffer *in )
294 {
295 index_t i;
296 int success;
297
298 for( i=0; i<in->numDomains; i++ )
299 if( Paso_CommBuffer_waitSend( in, in->domains[i] )==FALSE )
300 return FALSE;
301
302 return TRUE;
303 }
304
305 /*
306 waits for all a pending send in the buffer before continueing
307 */
308 bool_t Paso_CommBuffer_waitSend( Paso_CommBuffer *in, index_t dom )
309 {
310 index_t position;
311 int success;
312
313 if( (position = Paso_CommBuffer_checkDomain( in, dom ))==-1 )
314 return FALSE;
315
316 if( in->requestForward[position]!=MPI_REQUEST_NULL )
317 {
318 success = MPI_Wait( &in->requestForward[position], &in->statusForward[position] );
319
320 if( success!=MPI_SUCCESS )
321 {
322 Paso_setError( PASO_MPI_ERROR, "Paso_CommBuffer_waitSend() : Error : failed MPI_Isend" );
323 return FALSE;
324 }
325 }
326
327 return TRUE;
328 }
329
330 /*
331 waits for all pending receives in the buffer to complete.
332 returns immediately if an error is encountered.
333 */
334 bool_t Paso_CommBuffer_waitRecvPending( Paso_CommBuffer *in )
335 {
336 index_t i;
337 int success;
338
339 for( i=0; i<in->numDomains; i++ )
340 if( Paso_CommBuffer_waitRecv( in, in->domains[i] )==FALSE )
341 return FALSE;
342
343 return TRUE;
344 }
345
346 /*
347 waits for all a pending receives in the buffer before continueing
348 */
349 bool_t Paso_CommBuffer_waitRecv( Paso_CommBuffer *in, index_t dom )
350 {
351 index_t position;
352 int success;
353
354 if( (position = Paso_CommBuffer_checkDomain( in, dom ))==-1 )
355 return FALSE;
356
357 if( in->requestBackward[position]!=MPI_REQUEST_NULL )
358 {
359 success = MPI_Wait( &in->requestBackward[position], &in->statusBackward[position] );
360
361 if( success!=MPI_SUCCESS )
362 {
363 Paso_setError( PASO_MPI_ERROR, "Paso_CommBuffer_waitRecv() : Error : failed MPI_Irecv" );
364 return FALSE;
365 }
366
367 /* verify that the received buffer was the length of the requested buffer */
368 MPI_Get_count( in->statusBackward + position, MPI_BYTE, &success );
369 if( success!=in->requestedRecvLength[position] )
370 {
371 Paso_setError( PASO_MPI_ERROR, "Paso_CommBuffer_waitRecv() : Error : size of received buffer and backward count are not equal" );
372 return FALSE;
373 }
374 }
375
376 return TRUE;
377 }
378
379
380 /* return a pointer to the forward (send) buffer, so that the user can pack it directly */
381 /* TODO */
382 /* This function should not be allowed in the release library, it is better to use the auxillery pack
383 funcitonality provided by Passo_CommBuffer_pack/unpack() */
384 void *Paso_CommBuffer_getBufferForward( Paso_CommBuffer *in, index_t dom )
385 {
386 index_t position=0;
387
388 if( (position = Paso_CommBuffer_checkDomain( in, dom ))==-1 )
389 return NULL;
390
391 if( !Paso_CommBuffer_waitRecv( in, dom ) )
392 return NULL;
393
394 return in->bufferForward[position];
395 }
396
397 /*
398 pack information into a send buffer
399 */
400 void Paso_CommBuffer_pack( Paso_CommBuffer *in, index_t dom, index_t *index, void *data, size_t itemSize, dim_t offset )
401 {
402 long i;
403 index_t position;
404 unsigned char *from, *to;
405
406 /* wait for pending sends from the buffer */
407 if( Paso_CommBuffer_waitSend( in, dom )==FALSE )
408 return;
409
410 /* ensure that the buffers are large enough */
411 if( itemSize>in->maxItemSize )
412 Paso_CommBuffer_allocTable( in, itemSize, NULL, NULL, -1, NULL );
413
414 /* setup pointers to regions for copying */
415 if( (position = Paso_CommBuffer_checkDomain( in, dom ))==-1 )
416 return;
417 if( in->numForward[position]==0 )
418 return;
419 from = (unsigned char*)data;
420 to = (unsigned char*)in->bufferForward[position];
421 from += offset*itemSize;
422
423 if( index==NULL && in->numForward[position]>0 )
424 {
425 /* if there is no index, pack the data as is */
426 memcpy( to, from, itemSize*in->numForward[position] );
427 }
428 else
429 {
430 /* pack the data according to index */
431 for( i=0; i<in->numForward[position]; i++, to+=itemSize )
432 memcpy( to , from + (i*itemSize), itemSize );
433 }
434 }
435
436 void Paso_CommBuffer_unpack( Paso_CommBuffer *in, index_t dom, index_t *index, void *data, size_t itemSize, dim_t offset )
437 {
438 long i;
439 index_t position;
440 unsigned char *from, *to;
441
442 /* wait for pending receives to the buffer */
443 if( Paso_CommBuffer_waitRecv( in, dom )==FALSE )
444 return;
445
446 /* ensure that the buffers are large enough */
447 if( itemSize>in->maxItemSize )
448 Paso_CommBuffer_allocTable( in, itemSize, NULL, NULL, -1, NULL );
449
450 /* setup pointers to regions for copying */
451 if( (position = Paso_CommBuffer_checkDomain( in, dom ))==-1 )
452 return;
453 if( in->numBackward[position]==0 )
454 return;
455
456 from = (unsigned char*)in->bufferBackward[position];
457 to = (unsigned char*)data;
458 to += offset*itemSize;
459
460 if( index==NULL && in->numBackward[position]>0 )
461 {
462 /* if there is no index, unpack the data as is */
463 memcpy( to, from, itemSize*in->numBackward[position] );
464 }
465 else
466 {
467 /* unpack the data according to index */
468 for( i=0; i<in->numBackward[position]; i++ )
469 memcpy( to + itemSize*index[i], from + (i*itemSize), itemSize );
470 }
471 }
472
473 /*
474 verify that the information stored accross the processors on the communicator
475 over which the CommBuffer is allocated is valid
476 */
477 bool_t Paso_CommBuffer_validate( Paso_CommBuffer *in )
478 {
479 index_t *tmpForwardMap=NULL, *tmpBackwardMap=NULL, *tmpConnectionMap=NULL, *tmpForward=NULL, *tmpBackward=NULL, *tmpConnection=NULL;
480 dim_t size, rank, i;
481
482 if( in && in->MPIInfo->size>0 )
483 {
484 size = in->MPIInfo->size;
485 rank = in->MPIInfo->rank;
486
487 tmpForwardMap = MEMALLOC( size*size, index_t );
488 tmpBackwardMap = MEMALLOC( size*size, index_t );
489 tmpConnectionMap = MEMALLOC( size*size, index_t );
490 tmpForward = MEMALLOC( size, index_t );
491 tmpBackward = MEMALLOC( size, index_t );
492 tmpConnection = MEMALLOC( size, index_t );
493
494 /* collect global image of the distribution */
495 for( i=0; i<size; i++ )
496 tmpConnection[i] = 0;
497 for( i=0; i<in->numDomains; i++ )
498 tmpConnection[in->domains[i]] = 1;
499 MPI_Allgather( tmpConnection, size, MPI_INT, tmpConnectionMap, size, MPI_INT, in->MPIInfo->comm );
500
501 for( i=0; i<size; i++ )
502 tmpForward[i] = 0;
503 for( i=0; i<in->numDomains; i++ )
504 tmpForward[in->domains[i]] = in->numForward[i];
505 MPI_Allgather( tmpForward, size, MPI_INT, tmpForwardMap, size, MPI_INT, in->MPIInfo->comm );
506
507
508 for( i=0; i<size; i++ )
509 tmpBackward[i] = 0;
510 for( i=0; i<in->numDomains; i++ )
511 tmpBackward[in->domains[i]] = in->numBackward[i];
512 MPI_Allgather( tmpBackward, size, MPI_INT, tmpBackwardMap, size, MPI_INT, in->MPIInfo->comm );
513
514 /* verify that information on different processors is consisent */
515 for( i=0; i<size; i++ )
516 {
517 if( tmpConnection[i] != tmpConnectionMap[rank+i*size] )
518 {
519 Paso_setError( VALUE_ERROR, "Paso_CommBuffer_validate() : neighbour connection map is inconsistent" );
520 goto clean;
521 }
522 }
523 for( i=0; i<size; i++ )
524 {
525 if( tmpForward[i] != tmpBackwardMap[rank+i*size] )
526 {
527 Paso_setError( VALUE_ERROR, "Paso_CommBuffer_validate() : neighbour forward map is inconsistent" );
528 goto clean;
529 }
530 }
531 for( i=0; i<size; i++ )
532 {
533 if( tmpBackward[i] != tmpForwardMap[rank+i*size] )
534 {
535 Paso_setError( VALUE_ERROR, "Paso_CommBuffer_validate() : neighbour backward map is inconsistent" );
536 goto clean;
537 }
538 }
539
540 }
541
542 clean :
543 MEMFREE( tmpBackwardMap );
544 MEMFREE( tmpForwardMap );
545 MEMFREE( tmpConnectionMap );
546 MEMFREE( tmpConnection );
547 MEMFREE( tmpForward );
548 MEMFREE( tmpBackward );
549
550 return Paso_MPI_noError( in->MPIInfo );
551 }
552
553 #endif

  ViewVC Help
Powered by ViewVC 1.1.26