/[escript]/branches/split/escriptcore/src/SplitWorld.cpp
ViewVC logotype

Annotation of /branches/split/escriptcore/src/SplitWorld.cpp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 4813 - (hide annotations)
Thu Mar 27 07:57:49 2014 UTC (5 years, 4 months ago) by jfenwick
File size: 10291 byte(s)
Blocking some uses of MPI_COMM_WORLD
1 jfenwick 4730 /*****************************************************************************
2     *
3     * Copyright (c) 2014 by University of Queensland
4     * http://www.uq.edu.au
5     *
6     * Primary Business: Queensland, Australia
7     * Licensed under the Open Software License version 3.0
8     * http://www.opensource.org/licenses/osl-3.0.php
9     *
10     * Development until 2012 by Earth Systems Science Computational Center (ESSCC)
11     * Development 2012-2013 by School of Earth Sciences
12     * Development from 2014 by Centre for Geoscience Computing (GeoComp)
13     *
14     *****************************************************************************/
15    
16     #include "esysUtils/Esys_MPI.h"
17 jfenwick 4734 #include "SplitWorld.h"
18 jfenwick 4730 #include "AbstractDomain.h"
19 jfenwick 4731 #include "SplitWorldException.h"
20 jfenwick 4734 #include "SplitWorldException.h"
21 jfenwick 4730
22 jfenwick 4731 #include <iostream>
23 jfenwick 4796 #include <sstream>
24 jfenwick 4731
25 jfenwick 4730 using namespace boost::python;
26     using namespace escript;
27    
28 jfenwick 4734 SplitWorld::SplitWorld(unsigned int numgroups, MPI_Comm global)
29 jfenwick 4808 :localworld((SubWorld*)0), swcount(numgroups>0?numgroups:1), jobcounter(1), manualimport(false)
30 jfenwick 4730 {
31 jfenwick 4747 globalcom=esysUtils::makeInfo(global);
32    
33 jfenwick 4746 int grank=0;
34     int wsize=1; // each world has this many processes
35     #ifdef ESYS_MPI
36 jfenwick 4747 int gsize=globalcom->size;
37     grank=globalcom->rank;
38 jfenwick 4746 if (gsize%swcount!=0)
39     {
40     throw SplitWorldException("SplitWorld error: requested number of groups is not a factor of global communicator size.");
41     }
42     wsize=gsize/swcount; // each world has this many processes
43 jfenwick 4747 MPI_Comm sub;
44     int res=MPI_Comm_split(MPI_COMM_WORLD, grank/wsize, grank%wsize, &sub);
45 jfenwick 4746 if (res!=MPI_SUCCESS)
46     {
47     throw SplitWorldException("SplitWorld error: Unable to form communicator.");
48     }
49 jfenwick 4747 subcom=esysUtils::makeInfo(sub,true);
50     #else
51     subcom=esysUtils::makeInfo(0);
52 jfenwick 4746 #endif
53 jfenwick 4730 localworld=SubWorld_ptr(new SubWorld(subcom));
54 jfenwick 4731 localid=grank/wsize;
55 jfenwick 4730 }
56    
57 jfenwick 4734 SplitWorld::~SplitWorld()
58 jfenwick 4730 {
59 jfenwick 4747 // communicator cleanup handled by the MPI_Info
60 jfenwick 4730 }
61    
62    
63     // The boost wrapper will ensure that there is at least one entry in the tuple
64 jfenwick 4734 object SplitWorld::buildDomains(tuple t, dict kwargs)
65 jfenwick 4730 {
66     int tsize=len(t);
67     // get the callable that we will invoke in a sec
68     object tocall=t[0];
69     // make a new tuple without the first element
70     tuple ntup=tuple(t.slice(1,tsize));
71     // now add the subworld to the kwargs
72     kwargs["escriptworld"]=localworld;
73    
74     // pass the whole package to the python call
75     object dobj=tocall(*ntup, **kwargs);
76     extract<Domain_ptr> ex1(dobj);
77     Domain_ptr dptr=ex1();
78    
79     // now do a sanity check to see if the domain has respected the communicator info we passed it.
80 jfenwick 4747 if (dptr->getMPIComm()!=localworld->getMPI()->comm)
81 jfenwick 4730 {
82 jfenwick 4734 throw SplitWorldException("The newly constructed domain is not using the correct communicator.");
83 jfenwick 4730 }
84     localworld->setDomain(dptr);
85     return object(); // return None
86     }
87    
88 jfenwick 4745
89 jfenwick 4773 namespace
90     {
91    
92     // Throw all values in and get the maximum
93     // This is not an AllReduce because we need to use Tagged Communication so as not to interfere with
94     // other messages / collective ops which the SubWorld's Jobs might be using
95     // This is implemented by sending to rank 0
96     bool checkResultInt(int res, int& mres, esysUtils::JMPI& info)
97     {
98 jfenwick 4802 #ifdef ESYS_MPI
99 jfenwick 4773 const int leader=0;
100     const int BIGTAG=esysUtils::getSubWorldTag();
101     if (info->size==1)
102     {
103     mres=res;
104     return true;
105     }
106     else
107     {
108     if (info->rank!=leader)
109     {
110     if (MPI_Send(&res, 1, MPI_INT, leader, BIGTAG, info->comm)!=MPI_SUCCESS)
111     {
112     return false;
113     }
114     if (MPI_Recv(&mres, 1, MPI_INT, leader, BIGTAG, info->comm,0)!=MPI_SUCCESS)
115     {
116     return false;
117     }
118     }
119     else
120     {
121     MPI_Request* reqs=new MPI_Request[info->size-1];
122     int* eres=new int[info->size-1];
123     for (int i=0;i<info->size-1;++i)
124     {
125     MPI_Irecv(eres+i, 1, MPI_INT, i+1, BIGTAG, info->comm, reqs+i);
126     }
127     if (MPI_Waitall(info->size-1, reqs, 0)!=MPI_SUCCESS)
128     {
129     delete[] reqs;
130     return false;
131     }
132     // now we have them all, find the max
133     mres=res;
134     for (int i=0;i<info->size-1;++i)
135     {
136     if (mres<eres[i])
137     {
138     mres=eres[i];
139     }
140     }
141     // now we know what the result should be
142     // send it to the others
143     for (int i=0;i<info->size-1;++i)
144     {
145     MPI_Isend(&mres, 1, MPI_INT, i+1, BIGTAG, info->comm, reqs+i);
146     }
147     if (MPI_Waitall(info->size-1, reqs,0)!=MPI_SUCCESS)
148     {
149     return false;
150     }
151     }
152    
153     }
154     return true;
155 jfenwick 4802 #else
156     mres=res;
157     return true;
158     #endif
159 jfenwick 4773 }
160    
161    
162     }
163    
164 jfenwick 4745 // Executes all pending jobs on all subworlds
165     void SplitWorld::runJobs()
166 jfenwick 4731 {
167 jfenwick 4813 esysUtils::NoCOMM_WORLD ncw; // it's destructor will unset the flag
168 jfenwick 4745 distributeJobs();
169 jfenwick 4734 int mres=0;
170 jfenwick 4746 std::string err;
171 jfenwick 4808 std::vector<char> impexpdetail;
172 jfenwick 4734 do
173     {
174     // now we actually need to run the jobs
175     // everybody will be executing their localworld's jobs
176 jfenwick 4746 int res=localworld->runJobs(err);
177 jfenwick 4808
178 jfenwick 4802 // take this opportunity to clean up
179     localworld->clearImportExports();
180 jfenwick 4734 // now we find out about the other worlds
181 jfenwick 4773 if (!checkResultInt(res, mres, globalcom))
182 jfenwick 4734 {
183     throw SplitWorldException("MPI appears to have failed.");
184     }
185 jfenwick 4808 if (mres>1) // 1 and 0 are normal returns, >1 is some sort of error
186     {
187     break;
188     }
189     if (!localworld->localTransport(impexpdetail, err))
190     {
191     mres=4;
192     break;
193     }
194 jfenwick 4734 } while (mres==1);
195 jfenwick 4802 if (mres==0)
196 jfenwick 4734 {
197 jfenwick 4802 return;
198     }
199     else if (mres==2)
200     {
201 jfenwick 4734 throw SplitWorldException("At least one Job's work() function did not return True/False.");
202     }
203     else if (mres==3)
204     {
205 jfenwick 4746 char* resultstr=0;
206 jfenwick 4773 // now we ship around the error message - This should be safe since
207     // eveyone must have finished their Jobs to get here
208 jfenwick 4747 if (!esysUtils::shipString(err.c_str(), &resultstr, globalcom->comm))
209 jfenwick 4746 {
210     throw SplitWorldException("MPI appears to have failed.");
211     }
212     //throw SplitWorldException("At least one Job's work() function raised an exception.");
213     std::string s("At least one Job's work() function raised the following exception:\n");
214     s+=resultstr;
215     throw SplitWorldException(s);
216 jfenwick 4734 }
217 jfenwick 4802 else if (mres==4)
218     {
219     throw SplitWorldException("While processing exports: "+err);
220    
221     }
222     else
223     {
224     throw SplitWorldException("Unexpected return value from runJobs.");
225     }
226 jfenwick 4731 }
227    
228 jfenwick 4745 /**
229     stores the constructor/factory to make Jobs and the parameters.
230     */
231     void SplitWorld::addJob(boost::python::object creator, boost::python::tuple tup, boost::python::dict kw)
232     {
233     create.push_back(creator);
234     tupargs.push_back(tup);
235     kwargs.push_back(kw);
236     }
237    
238 jfenwick 4796 // At some point, we may need there to be more isolation here
239     // and trap any python exceptions etc, but for now I'll just call the constructor
240     void SplitWorld::addVariable(std::string name, boost::python::object creator, boost::python::tuple ntup, boost::python::dict kwargs)
241     {
242     object red=creator(*ntup, **kwargs);
243     extract<Reducer_ptr> ex(red);
244     if (!ex.check())
245     {
246     throw SplitWorldException("Creator function did not produce a reducer.");
247     }
248     Reducer_ptr rp=ex();
249 jfenwick 4808 localworld->addVariable(name, rp, manualimport);
250 jfenwick 4796 }
251    
252    
253     void SplitWorld::removeVariable(std::string name)
254     {
255 jfenwick 4802 localworld->removeVariable(name);
256 jfenwick 4796 }
257    
258 jfenwick 4745 void SplitWorld::clearPendingJobs()
259     {
260     create.clear();
261     tupargs.clear();
262     kwargs.clear();
263     }
264    
265     void SplitWorld::clearActiveJobs()
266     {
267     localworld->clearJobs();
268     }
269    
270     // All the job params are known on all the ranks.
271     void SplitWorld::distributeJobs()
272     {
273     unsigned int numjobs=create.size()/swcount;
274     unsigned int start=create.size()/swcount*localid;
275     if (localid<create.size()%swcount)
276     {
277     numjobs++;
278     start+=localid;
279     }
280     else
281     {
282     start+=create.size()%swcount;
283     }
284 jfenwick 4746 int errstat=0;
285 jfenwick 4745 try
286     {
287     // No other subworld will be looking at this portion of the array
288     // so jobs will only be created on one subworld
289     for (unsigned int i=start;i<start+numjobs;++i)
290     {
291     // we need to add some things to the kw map
292     kwargs[i]["domain"]=localworld->getDomain();
293     kwargs[i]["jobid"]=object(jobcounter+i);
294     object job=create[i](*(tupargs[i]), **(kwargs[i]));
295     localworld->addJob(job);
296     }
297     }
298     catch (boost::python::error_already_set e)
299     {
300     errstat=1;
301     }
302     jobcounter+=create.size();
303     clearPendingJobs();
304    
305     // MPI check to ensure that it worked for everybody
306 jfenwick 4746 int mstat=0;
307 jfenwick 4747 if (!esysUtils::checkResult(errstat, mstat, globalcom->comm))
308 jfenwick 4745 {
309     throw SplitWorldException("MPI appears to have failed.");
310     }
311    
312     if (errstat==1)
313     {
314     throw SplitWorldException("distributeJobs: Job creation failed.");
315     clearActiveJobs();
316     }
317     }
318    
319    
320 jfenwick 4730 namespace escript
321     {
322    
323     boost::python::object raw_buildDomains(boost::python::tuple t, boost::python::dict kwargs)
324     {
325     int l=len(t);
326     if (l<2)
327     {
328 jfenwick 4734 throw SplitWorldException("Insufficient parameters to buildDomains.");
329 jfenwick 4730 }
330 jfenwick 4734 extract<SplitWorld&> exw(t[0]);
331 jfenwick 4730 if (!exw.check())
332     {
333 jfenwick 4734 throw SplitWorldException("First parameter to buildDomains must be a SplitWorld.");
334 jfenwick 4730 }
335 jfenwick 4734 SplitWorld& ws=exw();
336 jfenwick 4730 tuple ntup=tuple(t.slice(1,l)); // strip off the object param
337     return ws.buildDomains(ntup, kwargs);
338     }
339    
340 jfenwick 4745 boost::python::object raw_addJob(boost::python::tuple t, boost::python::dict kwargs)
341     {
342     int l=len(t);
343     if (l<2)
344     {
345     throw SplitWorldException("Insufficient parameters to addJob.");
346     }
347     extract<SplitWorld&> exw(t[0]);
348     if (!exw.check())
349     {
350     throw SplitWorldException("First parameter to addJob must be a SplitWorld.");
351     }
352     SplitWorld& ws=exw();
353     object creator=t[1];
354     tuple ntup=tuple(t.slice(2,l)); // strip off the object param
355     ws.addJob(creator, ntup, kwargs);
356     return object();
357 jfenwick 4730 }
358 jfenwick 4745
359 jfenwick 4796 // expects, splitworld, name of var, constructor function for the reducer, any constructor params
360     boost::python::object raw_addVariable(boost::python::tuple t, boost::python::dict kwargs)
361     {
362     int l=len(t);
363     if (l<3)
364     {
365     throw SplitWorldException("Insufficient parameters to addReducer.");
366     }
367     extract<SplitWorld&> exw(t[0]);
368     if (!exw.check())
369     {
370     throw SplitWorldException("First parameter to addVariable must be a SplitWorld.");
371     }
372     SplitWorld& ws=exw();
373     object pname=t[1];
374     extract<std::string> ex2(pname);
375     if (!ex2.check())
376     {
377     throw SplitWorldException("Second parameter to addVariable must be a string");
378     }
379     std::string name=ex2();
380     object creator=t[2];
381     tuple ntup=tuple(t.slice(3,l)); // strip off the object param
382     ws.addVariable(name, creator, ntup, kwargs);
383     return object();
384     }
385 jfenwick 4745
386 jfenwick 4796
387    
388 jfenwick 4745 }

  ViewVC Help
Powered by ViewVC 1.1.26