/[escript]/trunk/paso/src/CommBuffer.c
ViewVC logotype

Annotation of /trunk/paso/src/CommBuffer.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 790 - (hide annotations)
Wed Jul 26 23:12:34 2006 UTC (15 years ago) by bcumming
File MIME type: text/plain
File size: 15850 byte(s)
changes to escript/py_src/pdetools.py and /escript/src/Data.h/.cpp to
make the Locator work in MPI. escript::Data::mindp now returns a 3 tuple,
with the MPI rank of the process on which the minimum value occurs
included. escript::Data::convertToNumArrayFromDPNo also takes the ProcNo
to perform the MPI reduction.

This had to be implemented in both the MPI and non-MPI versions to allow
the necesary changes to the Python code in pdetools.py. In the non-MPI
version ProcNo is set to 0. This works for the explicit scripts tested
thus far, however if it causes problems in your scripts contact Ben or
Lutz, or revert the three files (pdetools.py, Data.h and Data.cpp) to
the previous version.  


1 bcumming 751 /*******************************************************************
2    
3     a buffer type for communication of "boundary" data between domains.
4     send and receive buffers are allocated for each neighbouring domain,
5     and can be used for sending any type of data distributed over the domains.
6     Looks after communication completion, and safeguards against message collision
7     etc.
8    
9     *******************************************************************/
10    
11     #include "Paso.h"
12    
13     #ifdef PASO_MPI
14     #include <string.h>
15    
16     #include "CommBuffer.h"
17    
18     static index_t Paso_CommBuffer_checkDomain( Paso_CommBuffer *in, index_t dom )
19     {
20     index_t position;
21    
22     if( !in || !in->numDomains )
23     {
24     Paso_setError( VALUE_ERROR, "Paso_CommBuffer_checkDomain() : invalid or improperly uninitialised CommBuffer");
25     return -1;
26     }
27     if( dom<0 || dom>(in->MPIInfo->size-1) )
28     {
29     Paso_setError( VALUE_ERROR, "Paso_CommBuffer_checkDomain() : domain not in the communicator");
30     return -1;
31     }
32     position = in->indexDomains[dom];
33     if( position<0 )
34     {
35     Paso_setError( VALUE_ERROR,"Paso_CommBuffer_checkDomain() : domain not a neighbour of this domain" );
36     return -1;
37     }
38    
39     return position;
40     }
41    
42     /***************************************
43     * alloc/dealloc for CommBuffer type
44     ***************************************/
45     Paso_CommBuffer *Paso_CommBuffer_alloc( Paso_MPIInfo *MPIInfo, index_t tag )
46     {
47     Paso_CommBuffer *out=NULL;
48     index_t i;
49    
50     out = MEMALLOC( 1, Paso_CommBuffer );
51    
52     if( Paso_checkPtr(out) )
53     return NULL;
54    
55     out->reference_counter = 0;
56    
57     out->MPIInfo = Paso_MPIInfo_getReference( MPIInfo );
58     if( !Paso_noError() )
59     {
60     MEMFREE(out);
61     return NULL;
62     }
63    
64     out->bufferForward = out->bufferBackward = NULL;
65     out->numForward = out->numBackward = NULL;
66     out->domains = NULL;
67     out->indexDomains = MEMALLOC( MPIInfo->size, index_t );
68     for( i=0; i<MPIInfo->size; i++ )
69     out->indexDomains[i] = -1;
70     out->statusForward = NULL;
71     out->requestForward = NULL;
72     out->statusBackward = NULL;
73     out->requestBackward = NULL;
74     out->requestedRecvLength = NULL;
75     out->tag = tag;
76     out->numDomains = 0;
77     out->maxItemSize = 0;
78     out->reference_counter++;
79    
80     return out;
81     }
82    
83     void Paso_CommBuffer_dealloc( Paso_CommBuffer *in )
84     {
85     if( in && !(--in->reference_counter) )
86     {
87     index_t i;
88    
89     if( in->bufferForward )
90     for( i=0; i<in->numDomains; i++ ) {
91     MEMFREE( in->bufferForward[i] );
92     }
93     MEMFREE( in->bufferForward );
94    
95     if( in->bufferBackward )
96     for( i=0; i<in->numDomains; i++ ) {
97     MEMFREE( in->bufferBackward[i] );
98     }
99     MEMFREE( in->bufferBackward );
100     MEMFREE( in->requestedRecvLength);
101     MEMFREE( in->numForward );
102     MEMFREE( in->numBackward );
103     MEMFREE( in->domains );
104     MEMFREE( in->indexDomains );
105     MEMFREE( in->statusForward );
106     MEMFREE( in->requestForward );
107     MEMFREE( in->statusBackward );
108     MEMFREE( in->requestBackward );
109    
110     Paso_MPIInfo_dealloc( in->MPIInfo );
111    
112     MEMFREE( in );
113     }
114     }
115    
116     Paso_CommBuffer *Paso_CommBuffer_getReference( Paso_CommBuffer *in )
117     {
118     if( !in )
119     return NULL;
120    
121     in->reference_counter++;
122    
123     return in;
124     }
125    
126     void Paso_CommBuffer_allocTable( Paso_CommBuffer *in, size_t itemSize, index_t *numForward, index_t *numBackward, dim_t numDomains, index_t *domains )
127     {
128     index_t i;
129    
130     if( in && ( !in->domains || itemSize>in->maxItemSize ) )
131     {
132     /* Setup the domain info if it has not already been done.
133     In effect, this says that the domain info is fixed for the lifespan of the
134     CommBuffer. For example, one cannot reconfigure the structure for DOF
135     data to element data. */
136     if( !in->domains )
137     {
138     if( numDomains>0 )
139     {
140     in->numForward = MEMALLOC( numDomains, index_t );
141     in->numBackward = MEMALLOC( numDomains, index_t );
142     in->domains = MEMALLOC( numDomains, index_t );
143     in->requestedRecvLength= MEMALLOC( numDomains, dim_t);
144     in->requestForward = MEMALLOC( numDomains, MPI_Request );
145     in->statusForward = MEMALLOC( numDomains, MPI_Status );
146     in->requestBackward = MEMALLOC( numDomains, MPI_Request );
147     in->statusBackward = MEMALLOC( numDomains, MPI_Status );
148    
149     if( ( Paso_checkPtr(in->numForward) || Paso_checkPtr(in->requestedRecvLength) || Paso_checkPtr(in->numBackward) || Paso_checkPtr(in->domains) || Paso_checkPtr(in->requestForward) || Paso_checkPtr(in->statusForward) ) )
150     {
151     MEMFREE( in->requestedRecvLength );
152     MEMFREE( in->numForward );
153     MEMFREE( in->numBackward );
154     MEMFREE( in->domains );
155     MEMFREE( in->requestForward );
156     MEMFREE( in->statusForward );
157     MEMFREE( in->requestBackward );
158     MEMFREE( in->statusBackward );
159     return;
160     }
161    
162     memcpy( in->numForward, numForward, numDomains*sizeof(index_t) );
163     memcpy( in->numBackward, numBackward, numDomains*sizeof(index_t) );
164     memcpy( in->domains, domains, numDomains*sizeof(index_t) );
165     for( i=0; i<numDomains; i++ )
166     in->requestForward[i] = in->requestBackward[i] = MPI_REQUEST_NULL;
167    
168     for( i=0; i<numDomains; i++ )
169     in->requestedRecvLength[i] =0;
170    
171     for( i=0; i<numDomains; i++ )
172     in->indexDomains[domains[i]] = i;
173    
174     }
175     in->numDomains = numDomains;
176     }
177    
178     /* setup the buffers, which may need to be reallocated in the case where
179     the maximum itemSize has been reset */
180     if( in->numDomains>0 )
181     {
182     if( !in->bufferForward )
183     {
184     in->bufferForward = MEMALLOC( in->numDomains, void* );
185     for( i=0; i<in->numDomains; i++ )
186     in->bufferForward[i] = NULL;
187     }
188     for( i=0; i<in->numDomains; i++ )
189     if( in->numForward[i]>0 && itemSize>0 )
190     {
191     MEMREALLOC( in->bufferForward[i], in->numForward[i]*itemSize, char );
192     }
193    
194     if( !in->bufferBackward )
195     {
196     in->bufferBackward = MEMALLOC( numDomains, void* );
197     for( i=0; i<in->numDomains; i++ )
198     in->bufferBackward[i] = NULL;
199     }
200     for( i=0; i<in->numDomains; i++ )
201     if( in->numBackward[i]>0 && itemSize>0 )
202     {
203     MEMREALLOC( in->bufferBackward[i], in->numBackward[i]*itemSize, char );
204     }
205    
206     in->maxItemSize = itemSize;
207     }
208     }
209     }
210    
211     bool_t Paso_CommBuffer_recv( Paso_CommBuffer *in, index_t dom, size_t itemSize )
212     {
213     index_t position=0;
214     int result;
215     MPI_Status status;
216     MPI_Request request;
217    
218     if( (position=Paso_CommBuffer_checkDomain( in, dom )) == -1 )
219     return FALSE;
220    
221     /* ensure that the buffers are large enough */
222     if( itemSize>in->maxItemSize )
223     Paso_CommBuffer_allocTable( in, itemSize, NULL, NULL, -1, NULL );
224    
225     /* check that there isn't already a pending receive on the buffer */
226     if( in->requestBackward[position]!=MPI_REQUEST_NULL )
227     {
228     Paso_setError( PASO_MPI_ERROR, "Paso_CommBuffer_recv() : cannot request a receive in a buffer that has an unresolved request" );
229     return FALSE;
230     }
231    
232     result = MPI_Irecv( in->bufferBackward[position], in->numBackward[position]*itemSize, MPI_BYTE, dom, in->tag, in->MPIInfo->comm, in->requestBackward+position );
233    
234     if( result!=MPI_SUCCESS )
235     return FALSE;
236    
237     /* set the size of the requested receive for error checking when testing if the receive completed */
238     in->requestedRecvLength[position] = itemSize*in->numBackward[position];
239    
240     return TRUE;
241     }
242    
243     /* Assumes that there is/will be a message sent */
244     bool_t Paso_CommBuffer_recvAny( Paso_CommBuffer *in, index_t *dom, size_t itemSize )
245     {
246     int result;
247     MPI_Status status;
248    
249     /* ensure that the buffers are large enough */
250     if( itemSize>in->maxItemSize )
251     Paso_CommBuffer_allocTable( in, itemSize, NULL, NULL, -1, NULL );
252    
253    
254     /* probe for a message. Note, this is a blocking call */
255     MPI_Probe( MPI_ANY_SOURCE, in->tag, in->MPIInfo->comm, &status);
256     *dom = status.MPI_SOURCE;
257    
258     return Paso_CommBuffer_recv( in, *dom, itemSize );
259     }
260    
261     bool_t Paso_CommBuffer_send( Paso_CommBuffer *in, index_t dom, size_t itemSize )
262     {
263     index_t position=0;
264     int result, i;
265    
266     if( (position = Paso_CommBuffer_checkDomain( in, dom ))==-1 )
267     return FALSE;
268    
269     /* check to make sure that the send buffer doesn't already have a pending send */
270     if( Paso_CommBuffer_waitSend( in, dom )==FALSE )
271     return FALSE;
272    
273     /* is the buffer large enough? if so, it is too late to resize it
274     and we have to flag an error. */
275     if( itemSize>in->maxItemSize )
276     {
277     Paso_setError( VALUE_ERROR, "Paso_CommBuffer_send() : the requested itemSize exceeds in->maxItemSize" );
278     return FALSE;
279     }
280    
281     /* send the buffer */
282     result = MPI_Isend( in->bufferForward[position], itemSize*in->numForward[position], MPI_BYTE, dom, in->tag, in->MPIInfo->comm, &in->requestForward[position] );
283    
284     if( result!=MPI_SUCCESS )
285     return FALSE;
286     return TRUE;
287     }
288    
289     /*
290     waits for all pending sends in the buffer to complete.
291     returns immediately if an error is encountered.
292     */
293     bool_t Paso_CommBuffer_waitSendPending( Paso_CommBuffer *in )
294     {
295     index_t i;
296     int success;
297    
298     for( i=0; i<in->numDomains; i++ )
299     if( Paso_CommBuffer_waitSend( in, in->domains[i] )==FALSE )
300     return FALSE;
301    
302     return TRUE;
303     }
304    
305     /*
306     waits for all a pending send in the buffer before continueing
307     */
308     bool_t Paso_CommBuffer_waitSend( Paso_CommBuffer *in, index_t dom )
309     {
310     index_t position;
311     int success;
312    
313     if( (position = Paso_CommBuffer_checkDomain( in, dom ))==-1 )
314     return FALSE;
315    
316     if( in->requestForward[position]!=MPI_REQUEST_NULL )
317     {
318     success = MPI_Wait( &in->requestForward[position], &in->statusForward[position] );
319    
320     if( success!=MPI_SUCCESS )
321     {
322 bcumming 782 Paso_setError( PASO_MPI_ERROR, "Paso_CommBuffer_waitSend() : failed MPI_Isend" );
323 bcumming 751 return FALSE;
324     }
325     }
326    
327     return TRUE;
328     }
329    
330     /*
331     waits for all pending receives in the buffer to complete.
332     returns immediately if an error is encountered.
333     */
334     bool_t Paso_CommBuffer_waitRecvPending( Paso_CommBuffer *in )
335     {
336     index_t i;
337     int success;
338    
339     for( i=0; i<in->numDomains; i++ )
340     if( Paso_CommBuffer_waitRecv( in, in->domains[i] )==FALSE )
341     return FALSE;
342    
343     return TRUE;
344     }
345    
346     /*
347     waits for all a pending receives in the buffer before continueing
348     */
349     bool_t Paso_CommBuffer_waitRecv( Paso_CommBuffer *in, index_t dom )
350     {
351     index_t position;
352     int success;
353    
354     if( (position = Paso_CommBuffer_checkDomain( in, dom ))==-1 )
355     return FALSE;
356    
357     if( in->requestBackward[position]!=MPI_REQUEST_NULL )
358     {
359     success = MPI_Wait( &in->requestBackward[position], &in->statusBackward[position] );
360    
361     if( success!=MPI_SUCCESS )
362     {
363 bcumming 782 Paso_setError( PASO_MPI_ERROR, "Paso_CommBuffer_waitRecv() : failed MPI_Irecv" );
364 bcumming 751 return FALSE;
365     }
366    
367     /* verify that the received buffer was the length of the requested buffer */
368     MPI_Get_count( in->statusBackward + position, MPI_BYTE, &success );
369     if( success!=in->requestedRecvLength[position] )
370     {
371 bcumming 782 Paso_setError( PASO_MPI_ERROR, "Paso_CommBuffer_waitRecv() : size of received buffer and backward count are not equal" );
372 bcumming 751 return FALSE;
373     }
374     }
375    
376     return TRUE;
377     }
378    
379    
380     /*
381     pack information into a send buffer
382     */
383     void Paso_CommBuffer_pack( Paso_CommBuffer *in, index_t dom, index_t *index, void *data, size_t itemSize, dim_t offset )
384     {
385     long i;
386     index_t position;
387     unsigned char *from, *to;
388    
389     /* wait for pending sends from the buffer */
390     if( Paso_CommBuffer_waitSend( in, dom )==FALSE )
391     return;
392    
393     /* ensure that the buffers are large enough */
394     if( itemSize>in->maxItemSize )
395     Paso_CommBuffer_allocTable( in, itemSize, NULL, NULL, -1, NULL );
396    
397     /* setup pointers to regions for copying */
398     if( (position = Paso_CommBuffer_checkDomain( in, dom ))==-1 )
399     return;
400     if( in->numForward[position]==0 )
401     return;
402     from = (unsigned char*)data;
403     to = (unsigned char*)in->bufferForward[position];
404     from += offset*itemSize;
405    
406     if( index==NULL && in->numForward[position]>0 )
407     {
408     /* if there is no index, pack the data as is */
409     memcpy( to, from, itemSize*in->numForward[position] );
410     }
411     else
412     {
413     /* pack the data according to index */
414     for( i=0; i<in->numForward[position]; i++, to+=itemSize )
415 bcumming 790 memcpy( to , from + (index[i]*itemSize), itemSize );
416 bcumming 751 }
417     }
418    
419     void Paso_CommBuffer_unpack( Paso_CommBuffer *in, index_t dom, index_t *index, void *data, size_t itemSize, dim_t offset )
420     {
421     long i;
422     index_t position;
423     unsigned char *from, *to;
424    
425     /* wait for pending receives to the buffer */
426     if( Paso_CommBuffer_waitRecv( in, dom )==FALSE )
427     return;
428    
429     /* ensure that the buffers are large enough */
430     if( itemSize>in->maxItemSize )
431     Paso_CommBuffer_allocTable( in, itemSize, NULL, NULL, -1, NULL );
432    
433     /* setup pointers to regions for copying */
434     if( (position = Paso_CommBuffer_checkDomain( in, dom ))==-1 )
435     return;
436     if( in->numBackward[position]==0 )
437     return;
438    
439     from = (unsigned char*)in->bufferBackward[position];
440     to = (unsigned char*)data;
441     to += offset*itemSize;
442    
443     if( index==NULL && in->numBackward[position]>0 )
444     {
445     /* if there is no index, unpack the data as is */
446     memcpy( to, from, itemSize*in->numBackward[position] );
447     }
448     else
449     {
450     /* unpack the data according to index */
451     for( i=0; i<in->numBackward[position]; i++ )
452 bcumming 782 memcpy( to + itemSize*index[i], from + (i*itemSize), itemSize );
453 bcumming 751 }
454     }
455    
456     /*
457     verify that the information stored accross the processors on the communicator
458     over which the CommBuffer is allocated is valid
459     */
460     bool_t Paso_CommBuffer_validate( Paso_CommBuffer *in )
461     {
462     index_t *tmpForwardMap=NULL, *tmpBackwardMap=NULL, *tmpConnectionMap=NULL, *tmpForward=NULL, *tmpBackward=NULL, *tmpConnection=NULL;
463     dim_t size, rank, i;
464    
465     if( in && in->MPIInfo->size>0 )
466     {
467     size = in->MPIInfo->size;
468     rank = in->MPIInfo->rank;
469    
470     tmpForwardMap = MEMALLOC( size*size, index_t );
471     tmpBackwardMap = MEMALLOC( size*size, index_t );
472     tmpConnectionMap = MEMALLOC( size*size, index_t );
473     tmpForward = MEMALLOC( size, index_t );
474     tmpBackward = MEMALLOC( size, index_t );
475     tmpConnection = MEMALLOC( size, index_t );
476    
477     /* collect global image of the distribution */
478     for( i=0; i<size; i++ )
479     tmpConnection[i] = 0;
480     for( i=0; i<in->numDomains; i++ )
481     tmpConnection[in->domains[i]] = 1;
482     MPI_Allgather( tmpConnection, size, MPI_INT, tmpConnectionMap, size, MPI_INT, in->MPIInfo->comm );
483    
484     for( i=0; i<size; i++ )
485     tmpForward[i] = 0;
486     for( i=0; i<in->numDomains; i++ )
487     tmpForward[in->domains[i]] = in->numForward[i];
488     MPI_Allgather( tmpForward, size, MPI_INT, tmpForwardMap, size, MPI_INT, in->MPIInfo->comm );
489    
490    
491     for( i=0; i<size; i++ )
492     tmpBackward[i] = 0;
493     for( i=0; i<in->numDomains; i++ )
494     tmpBackward[in->domains[i]] = in->numBackward[i];
495     MPI_Allgather( tmpBackward, size, MPI_INT, tmpBackwardMap, size, MPI_INT, in->MPIInfo->comm );
496    
497     /* verify that information on different processors is consisent */
498     for( i=0; i<size; i++ )
499     {
500     if( tmpConnection[i] != tmpConnectionMap[rank+i*size] )
501     {
502     Paso_setError( VALUE_ERROR, "Paso_CommBuffer_validate() : neighbour connection map is inconsistent" );
503     goto clean;
504     }
505     }
506     for( i=0; i<size; i++ )
507     {
508     if( tmpForward[i] != tmpBackwardMap[rank+i*size] )
509     {
510     Paso_setError( VALUE_ERROR, "Paso_CommBuffer_validate() : neighbour forward map is inconsistent" );
511     goto clean;
512     }
513     }
514     for( i=0; i<size; i++ )
515     {
516     if( tmpBackward[i] != tmpForwardMap[rank+i*size] )
517     {
518     Paso_setError( VALUE_ERROR, "Paso_CommBuffer_validate() : neighbour backward map is inconsistent" );
519     goto clean;
520     }
521     }
522    
523     }
524    
525     clean :
526     MEMFREE( tmpBackwardMap );
527     MEMFREE( tmpForwardMap );
528     MEMFREE( tmpConnectionMap );
529     MEMFREE( tmpConnection );
530     MEMFREE( tmpForward );
531     MEMFREE( tmpBackward );
532    
533     return Paso_MPI_noError( in->MPIInfo );
534     }
535    
536     #endif

  ViewVC Help
Powered by ViewVC 1.1.26