/[escript]/trunk/paso/src/CommBuffer.c
ViewVC logotype

Annotation of /trunk/paso/src/CommBuffer.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 751 - (hide annotations)
Mon Jun 26 01:46:34 2006 UTC (13 years, 8 months ago) by bcumming
File MIME type: text/plain
File size: 16425 byte(s)
Changes relating to the MPI version of escript
The standard OpenMP version of escript is unchanged

- updated data types (Finley_Mesh, Finley_NodeFile, etc) to store meshes
  over multiple MPI processes.
- added CommBuffer code in Paso for communication of Data associated
  with distributed meshes
- updates in Finley and Escript to support distributed data and operations
  on distributed data (such as interpolation).
- construction of RHS in MPI, so that simple explicit schemes (such as
  /docs/examples/wave.py without IO and the Locator) can run in MPI.
- updated mesh generation for first order line, rectangle and brick
  meshes and second order line meshes in MPI.        
- small changes to trunk/SConstruct and trunk/scons/ess_options.py to
  build the MPI version, these changes are turned off by default.

1 bcumming 751 /*******************************************************************
2    
3     a buffer type for communication of "boundary" data between domains.
4     send and receive buffers are allocated for each neighbouring domain,
5     and can be used for sending any type of data distributed over the domains.
6     Looks after communication completion, and safeguards against message collision
7     etc.
8    
9     *******************************************************************/
10    
11     #include "Paso.h"
12    
13     #ifdef PASO_MPI
14     #include <string.h>
15    
16     #include "CommBuffer.h"
17    
18     static index_t Paso_CommBuffer_checkDomain( Paso_CommBuffer *in, index_t dom )
19     {
20     index_t position;
21    
22     if( !in || !in->numDomains )
23     {
24     Paso_setError( VALUE_ERROR, "Paso_CommBuffer_checkDomain() : invalid or improperly uninitialised CommBuffer");
25     return -1;
26     }
27     if( dom<0 || dom>(in->MPIInfo->size-1) )
28     {
29     Paso_setError( VALUE_ERROR, "Paso_CommBuffer_checkDomain() : domain not in the communicator");
30     return -1;
31     }
32     position = in->indexDomains[dom];
33     if( position<0 )
34     {
35     Paso_setError( VALUE_ERROR,"Paso_CommBuffer_checkDomain() : domain not a neighbour of this domain" );
36     return -1;
37     }
38    
39     return position;
40     }
41    
42     /***************************************
43     * alloc/dealloc for CommBuffer type
44     ***************************************/
45     Paso_CommBuffer *Paso_CommBuffer_alloc( Paso_MPIInfo *MPIInfo, index_t tag )
46     {
47     Paso_CommBuffer *out=NULL;
48     index_t i;
49    
50     out = MEMALLOC( 1, Paso_CommBuffer );
51    
52     if( Paso_checkPtr(out) )
53     return NULL;
54    
55     out->reference_counter = 0;
56    
57     out->MPIInfo = Paso_MPIInfo_getReference( MPIInfo );
58     if( !Paso_noError() )
59     {
60     MEMFREE(out);
61     return NULL;
62     }
63    
64     out->bufferForward = out->bufferBackward = NULL;
65     out->numForward = out->numBackward = NULL;
66     out->domains = NULL;
67     out->indexDomains = MEMALLOC( MPIInfo->size, index_t );
68     for( i=0; i<MPIInfo->size; i++ )
69     out->indexDomains[i] = -1;
70     out->statusForward = NULL;
71     out->requestForward = NULL;
72     out->statusBackward = NULL;
73     out->requestBackward = NULL;
74     out->requestedRecvLength = NULL;
75     out->tag = tag;
76     out->numDomains = 0;
77     out->maxItemSize = 0;
78     out->reference_counter++;
79    
80     return out;
81     }
82    
83     void Paso_CommBuffer_dealloc( Paso_CommBuffer *in )
84     {
85     if( in && !(--in->reference_counter) )
86     {
87     index_t i;
88    
89     if( in->bufferForward )
90     for( i=0; i<in->numDomains; i++ ) {
91     MEMFREE( in->bufferForward[i] );
92     }
93     MEMFREE( in->bufferForward );
94    
95     if( in->bufferBackward )
96     for( i=0; i<in->numDomains; i++ ) {
97     MEMFREE( in->bufferBackward[i] );
98     }
99     MEMFREE( in->bufferBackward );
100     MEMFREE( in->requestedRecvLength);
101     MEMFREE( in->numForward );
102     MEMFREE( in->numBackward );
103     MEMFREE( in->domains );
104     MEMFREE( in->indexDomains );
105     MEMFREE( in->statusForward );
106     MEMFREE( in->requestForward );
107     MEMFREE( in->statusBackward );
108     MEMFREE( in->requestBackward );
109    
110     Paso_MPIInfo_dealloc( in->MPIInfo );
111    
112     MEMFREE( in );
113     }
114     }
115    
116     Paso_CommBuffer *Paso_CommBuffer_getReference( Paso_CommBuffer *in )
117     {
118     if( !in )
119     return NULL;
120    
121     in->reference_counter++;
122    
123     return in;
124     }
125    
126     void Paso_CommBuffer_allocTable( Paso_CommBuffer *in, size_t itemSize, index_t *numForward, index_t *numBackward, dim_t numDomains, index_t *domains )
127     {
128     index_t i;
129    
130     if( in && ( !in->domains || itemSize>in->maxItemSize ) )
131     {
132     /* Setup the domain info if it has not already been done.
133     In effect, this says that the domain info is fixed for the lifespan of the
134     CommBuffer. For example, one cannot reconfigure the structure for DOF
135     data to element data. */
136     if( !in->domains )
137     {
138     if( numDomains>0 )
139     {
140     in->numForward = MEMALLOC( numDomains, index_t );
141     in->numBackward = MEMALLOC( numDomains, index_t );
142     in->domains = MEMALLOC( numDomains, index_t );
143     in->requestedRecvLength= MEMALLOC( numDomains, dim_t);
144     in->requestForward = MEMALLOC( numDomains, MPI_Request );
145     in->statusForward = MEMALLOC( numDomains, MPI_Status );
146     in->requestBackward = MEMALLOC( numDomains, MPI_Request );
147     in->statusBackward = MEMALLOC( numDomains, MPI_Status );
148    
149     if( ( Paso_checkPtr(in->numForward) || Paso_checkPtr(in->requestedRecvLength) || Paso_checkPtr(in->numBackward) || Paso_checkPtr(in->domains) || Paso_checkPtr(in->requestForward) || Paso_checkPtr(in->statusForward) ) )
150     {
151     MEMFREE( in->requestedRecvLength );
152     MEMFREE( in->numForward );
153     MEMFREE( in->numBackward );
154     MEMFREE( in->domains );
155     MEMFREE( in->requestForward );
156     MEMFREE( in->statusForward );
157     MEMFREE( in->requestBackward );
158     MEMFREE( in->statusBackward );
159     return;
160     }
161    
162     memcpy( in->numForward, numForward, numDomains*sizeof(index_t) );
163     memcpy( in->numBackward, numBackward, numDomains*sizeof(index_t) );
164     memcpy( in->domains, domains, numDomains*sizeof(index_t) );
165     for( i=0; i<numDomains; i++ )
166     in->requestForward[i] = in->requestBackward[i] = MPI_REQUEST_NULL;
167    
168     for( i=0; i<numDomains; i++ )
169     in->requestedRecvLength[i] =0;
170    
171     for( i=0; i<numDomains; i++ )
172     in->indexDomains[domains[i]] = i;
173    
174     }
175     in->numDomains = numDomains;
176     }
177    
178     /* setup the buffers, which may need to be reallocated in the case where
179     the maximum itemSize has been reset */
180     if( in->numDomains>0 )
181     {
182     if( !in->bufferForward )
183     {
184     in->bufferForward = MEMALLOC( in->numDomains, void* );
185     for( i=0; i<in->numDomains; i++ )
186     in->bufferForward[i] = NULL;
187     }
188     for( i=0; i<in->numDomains; i++ )
189     if( in->numForward[i]>0 && itemSize>0 )
190     {
191     MEMREALLOC( in->bufferForward[i], in->numForward[i]*itemSize, char );
192     }
193    
194     if( !in->bufferBackward )
195     {
196     in->bufferBackward = MEMALLOC( numDomains, void* );
197     for( i=0; i<in->numDomains; i++ )
198     in->bufferBackward[i] = NULL;
199     }
200     for( i=0; i<in->numDomains; i++ )
201     if( in->numBackward[i]>0 && itemSize>0 )
202     {
203     MEMREALLOC( in->bufferBackward[i], in->numBackward[i]*itemSize, char );
204     }
205    
206     in->maxItemSize = itemSize;
207     }
208     }
209     }
210    
211     bool_t Paso_CommBuffer_recv( Paso_CommBuffer *in, index_t dom, size_t itemSize )
212     {
213     index_t position=0;
214     int result;
215     MPI_Status status;
216     MPI_Request request;
217    
218     if( (position=Paso_CommBuffer_checkDomain( in, dom )) == -1 )
219     return FALSE;
220    
221     /* ensure that the buffers are large enough */
222     if( itemSize>in->maxItemSize )
223     Paso_CommBuffer_allocTable( in, itemSize, NULL, NULL, -1, NULL );
224    
225     /* check that there isn't already a pending receive on the buffer */
226     if( in->requestBackward[position]!=MPI_REQUEST_NULL )
227     {
228     Paso_setError( PASO_MPI_ERROR, "Paso_CommBuffer_recv() : cannot request a receive in a buffer that has an unresolved request" );
229     return FALSE;
230     }
231    
232     result = MPI_Irecv( in->bufferBackward[position], in->numBackward[position]*itemSize, MPI_BYTE, dom, in->tag, in->MPIInfo->comm, in->requestBackward+position );
233    
234     if( result!=MPI_SUCCESS )
235     return FALSE;
236    
237     /* set the size of the requested receive for error checking when testing if the receive completed */
238     in->requestedRecvLength[position] = itemSize*in->numBackward[position];
239    
240     return TRUE;
241     }
242    
243     /* Assumes that there is/will be a message sent */
244     bool_t Paso_CommBuffer_recvAny( Paso_CommBuffer *in, index_t *dom, size_t itemSize )
245     {
246     int result;
247     MPI_Status status;
248    
249     /* ensure that the buffers are large enough */
250     if( itemSize>in->maxItemSize )
251     Paso_CommBuffer_allocTable( in, itemSize, NULL, NULL, -1, NULL );
252    
253    
254     /* probe for a message. Note, this is a blocking call */
255     MPI_Probe( MPI_ANY_SOURCE, in->tag, in->MPIInfo->comm, &status);
256     *dom = status.MPI_SOURCE;
257    
258     return Paso_CommBuffer_recv( in, *dom, itemSize );
259     }
260    
261     bool_t Paso_CommBuffer_send( Paso_CommBuffer *in, index_t dom, size_t itemSize )
262     {
263     index_t position=0;
264     int result, i;
265    
266     if( (position = Paso_CommBuffer_checkDomain( in, dom ))==-1 )
267     return FALSE;
268    
269     /* check to make sure that the send buffer doesn't already have a pending send */
270     if( Paso_CommBuffer_waitSend( in, dom )==FALSE )
271     return FALSE;
272    
273     /* is the buffer large enough? if so, it is too late to resize it
274     and we have to flag an error. */
275     if( itemSize>in->maxItemSize )
276     {
277     Paso_setError( VALUE_ERROR, "Paso_CommBuffer_send() : the requested itemSize exceeds in->maxItemSize" );
278     return FALSE;
279     }
280    
281     /* send the buffer */
282     result = MPI_Isend( in->bufferForward[position], itemSize*in->numForward[position], MPI_BYTE, dom, in->tag, in->MPIInfo->comm, &in->requestForward[position] );
283    
284     if( result!=MPI_SUCCESS )
285     return FALSE;
286     return TRUE;
287     }
288    
289     /*
290     waits for all pending sends in the buffer to complete.
291     returns immediately if an error is encountered.
292     */
293     bool_t Paso_CommBuffer_waitSendPending( Paso_CommBuffer *in )
294     {
295     index_t i;
296     int success;
297    
298     for( i=0; i<in->numDomains; i++ )
299     if( Paso_CommBuffer_waitSend( in, in->domains[i] )==FALSE )
300     return FALSE;
301    
302     return TRUE;
303     }
304    
305     /*
306     waits for all a pending send in the buffer before continueing
307     */
308     bool_t Paso_CommBuffer_waitSend( Paso_CommBuffer *in, index_t dom )
309     {
310     index_t position;
311     int success;
312    
313     if( (position = Paso_CommBuffer_checkDomain( in, dom ))==-1 )
314     return FALSE;
315    
316     if( in->requestForward[position]!=MPI_REQUEST_NULL )
317     {
318     success = MPI_Wait( &in->requestForward[position], &in->statusForward[position] );
319    
320     if( success!=MPI_SUCCESS )
321     {
322     Paso_setError( PASO_MPI_ERROR, "Paso_CommBuffer_waitSend() : Error : failed MPI_Isend" );
323     return FALSE;
324     }
325     }
326    
327     return TRUE;
328     }
329    
330     /*
331     waits for all pending receives in the buffer to complete.
332     returns immediately if an error is encountered.
333     */
334     bool_t Paso_CommBuffer_waitRecvPending( Paso_CommBuffer *in )
335     {
336     index_t i;
337     int success;
338    
339     for( i=0; i<in->numDomains; i++ )
340     if( Paso_CommBuffer_waitRecv( in, in->domains[i] )==FALSE )
341     return FALSE;
342    
343     return TRUE;
344     }
345    
346     /*
347     waits for all a pending receives in the buffer before continueing
348     */
349     bool_t Paso_CommBuffer_waitRecv( Paso_CommBuffer *in, index_t dom )
350     {
351     index_t position;
352     int success;
353    
354     if( (position = Paso_CommBuffer_checkDomain( in, dom ))==-1 )
355     return FALSE;
356    
357     if( in->requestBackward[position]!=MPI_REQUEST_NULL )
358     {
359     success = MPI_Wait( &in->requestBackward[position], &in->statusBackward[position] );
360    
361     if( success!=MPI_SUCCESS )
362     {
363     Paso_setError( PASO_MPI_ERROR, "Paso_CommBuffer_waitRecv() : Error : failed MPI_Irecv" );
364     return FALSE;
365     }
366    
367     /* verify that the received buffer was the length of the requested buffer */
368     MPI_Get_count( in->statusBackward + position, MPI_BYTE, &success );
369     if( success!=in->requestedRecvLength[position] )
370     {
371     Paso_setError( PASO_MPI_ERROR, "Paso_CommBuffer_waitRecv() : Error : size of received buffer and backward count are not equal" );
372     return FALSE;
373     }
374     }
375    
376     return TRUE;
377     }
378    
379    
380     /* return a pointer to the forward (send) buffer, so that the user can pack it directly */
381     /* TODO */
382     /* This function should not be allowed in the release library, it is better to use the auxillery pack
383     funcitonality provided by Passo_CommBuffer_pack/unpack() */
384     void *Paso_CommBuffer_getBufferForward( Paso_CommBuffer *in, index_t dom )
385     {
386     index_t position=0;
387    
388     if( (position = Paso_CommBuffer_checkDomain( in, dom ))==-1 )
389     return NULL;
390    
391     if( !Paso_CommBuffer_waitRecv( in, dom ) )
392     return NULL;
393    
394     return in->bufferForward[position];
395     }
396    
397     /*
398     pack information into a send buffer
399     */
400     void Paso_CommBuffer_pack( Paso_CommBuffer *in, index_t dom, index_t *index, void *data, size_t itemSize, dim_t offset )
401     {
402     long i;
403     index_t position;
404     unsigned char *from, *to;
405    
406     /* wait for pending sends from the buffer */
407     if( Paso_CommBuffer_waitSend( in, dom )==FALSE )
408     return;
409    
410     /* ensure that the buffers are large enough */
411     if( itemSize>in->maxItemSize )
412     Paso_CommBuffer_allocTable( in, itemSize, NULL, NULL, -1, NULL );
413    
414     /* setup pointers to regions for copying */
415     if( (position = Paso_CommBuffer_checkDomain( in, dom ))==-1 )
416     return;
417     if( in->numForward[position]==0 )
418     return;
419     from = (unsigned char*)data;
420     to = (unsigned char*)in->bufferForward[position];
421     from += offset*itemSize;
422    
423     if( index==NULL && in->numForward[position]>0 )
424     {
425     /* if there is no index, pack the data as is */
426     memcpy( to, from, itemSize*in->numForward[position] );
427     }
428     else
429     {
430     /* pack the data according to index */
431     for( i=0; i<in->numForward[position]; i++, to+=itemSize )
432     memcpy( to , from + (i*itemSize), itemSize );
433     }
434     }
435    
436     void Paso_CommBuffer_unpack( Paso_CommBuffer *in, index_t dom, index_t *index, void *data, size_t itemSize, dim_t offset )
437     {
438     long i;
439     index_t position;
440     unsigned char *from, *to;
441    
442     /* wait for pending receives to the buffer */
443     if( Paso_CommBuffer_waitRecv( in, dom )==FALSE )
444     return;
445    
446     /* ensure that the buffers are large enough */
447     if( itemSize>in->maxItemSize )
448     Paso_CommBuffer_allocTable( in, itemSize, NULL, NULL, -1, NULL );
449    
450     /* setup pointers to regions for copying */
451     if( (position = Paso_CommBuffer_checkDomain( in, dom ))==-1 )
452     return;
453     if( in->numBackward[position]==0 )
454     return;
455    
456     from = (unsigned char*)in->bufferBackward[position];
457     to = (unsigned char*)data;
458     to += offset*itemSize;
459    
460     if( index==NULL && in->numBackward[position]>0 )
461     {
462     /* if there is no index, unpack the data as is */
463     memcpy( to, from, itemSize*in->numBackward[position] );
464     }
465     else
466     {
467     /* unpack the data according to index */
468     for( i=0; i<in->numBackward[position]; i++ )
469     memcpy( to + itemSize*index[i], from + (i*itemSize), itemSize );
470     }
471     }
472    
473     /*
474     verify that the information stored accross the processors on the communicator
475     over which the CommBuffer is allocated is valid
476     */
477     bool_t Paso_CommBuffer_validate( Paso_CommBuffer *in )
478     {
479     index_t *tmpForwardMap=NULL, *tmpBackwardMap=NULL, *tmpConnectionMap=NULL, *tmpForward=NULL, *tmpBackward=NULL, *tmpConnection=NULL;
480     dim_t size, rank, i;
481    
482     if( in && in->MPIInfo->size>0 )
483     {
484     size = in->MPIInfo->size;
485     rank = in->MPIInfo->rank;
486    
487     tmpForwardMap = MEMALLOC( size*size, index_t );
488     tmpBackwardMap = MEMALLOC( size*size, index_t );
489     tmpConnectionMap = MEMALLOC( size*size, index_t );
490     tmpForward = MEMALLOC( size, index_t );
491     tmpBackward = MEMALLOC( size, index_t );
492     tmpConnection = MEMALLOC( size, index_t );
493    
494     /* collect global image of the distribution */
495     for( i=0; i<size; i++ )
496     tmpConnection[i] = 0;
497     for( i=0; i<in->numDomains; i++ )
498     tmpConnection[in->domains[i]] = 1;
499     MPI_Allgather( tmpConnection, size, MPI_INT, tmpConnectionMap, size, MPI_INT, in->MPIInfo->comm );
500    
501     for( i=0; i<size; i++ )
502     tmpForward[i] = 0;
503     for( i=0; i<in->numDomains; i++ )
504     tmpForward[in->domains[i]] = in->numForward[i];
505     MPI_Allgather( tmpForward, size, MPI_INT, tmpForwardMap, size, MPI_INT, in->MPIInfo->comm );
506    
507    
508     for( i=0; i<size; i++ )
509     tmpBackward[i] = 0;
510     for( i=0; i<in->numDomains; i++ )
511     tmpBackward[in->domains[i]] = in->numBackward[i];
512     MPI_Allgather( tmpBackward, size, MPI_INT, tmpBackwardMap, size, MPI_INT, in->MPIInfo->comm );
513    
514     /* verify that information on different processors is consisent */
515     for( i=0; i<size; i++ )
516     {
517     if( tmpConnection[i] != tmpConnectionMap[rank+i*size] )
518     {
519     Paso_setError( VALUE_ERROR, "Paso_CommBuffer_validate() : neighbour connection map is inconsistent" );
520     goto clean;
521     }
522     }
523     for( i=0; i<size; i++ )
524     {
525     if( tmpForward[i] != tmpBackwardMap[rank+i*size] )
526     {
527     Paso_setError( VALUE_ERROR, "Paso_CommBuffer_validate() : neighbour forward map is inconsistent" );
528     goto clean;
529     }
530     }
531     for( i=0; i<size; i++ )
532     {
533     if( tmpBackward[i] != tmpForwardMap[rank+i*size] )
534     {
535     Paso_setError( VALUE_ERROR, "Paso_CommBuffer_validate() : neighbour backward map is inconsistent" );
536     goto clean;
537     }
538     }
539    
540     }
541    
542     clean :
543     MEMFREE( tmpBackwardMap );
544     MEMFREE( tmpForwardMap );
545     MEMFREE( tmpConnectionMap );
546     MEMFREE( tmpConnection );
547     MEMFREE( tmpForward );
548     MEMFREE( tmpBackward );
549    
550     return Paso_MPI_noError( in->MPIInfo );
551     }
552    
553     #endif

  ViewVC Help
Powered by ViewVC 1.1.26