/[escript]/branches/split/escriptcore/src/SplitWorld.cpp
ViewVC logotype

Contents of /branches/split/escriptcore/src/SplitWorld.cpp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 4813 - (show annotations)
Thu Mar 27 07:57:49 2014 UTC (5 years, 4 months ago) by jfenwick
File size: 10291 byte(s)
Blocking some uses of MPI_COMM_WORLD
1 /*****************************************************************************
2 *
3 * Copyright (c) 2014 by University of Queensland
4 * http://www.uq.edu.au
5 *
6 * Primary Business: Queensland, Australia
7 * Licensed under the Open Software License version 3.0
8 * http://www.opensource.org/licenses/osl-3.0.php
9 *
10 * Development until 2012 by Earth Systems Science Computational Center (ESSCC)
11 * Development 2012-2013 by School of Earth Sciences
12 * Development from 2014 by Centre for Geoscience Computing (GeoComp)
13 *
14 *****************************************************************************/
15
16 #include "esysUtils/Esys_MPI.h"
17 #include "SplitWorld.h"
18 #include "AbstractDomain.h"
19 #include "SplitWorldException.h"
20 #include "SplitWorldException.h"
21
22 #include <iostream>
23 #include <sstream>
24
25 using namespace boost::python;
26 using namespace escript;
27
28 SplitWorld::SplitWorld(unsigned int numgroups, MPI_Comm global)
29 :localworld((SubWorld*)0), swcount(numgroups>0?numgroups:1), jobcounter(1), manualimport(false)
30 {
31 globalcom=esysUtils::makeInfo(global);
32
33 int grank=0;
34 int wsize=1; // each world has this many processes
35 #ifdef ESYS_MPI
36 int gsize=globalcom->size;
37 grank=globalcom->rank;
38 if (gsize%swcount!=0)
39 {
40 throw SplitWorldException("SplitWorld error: requested number of groups is not a factor of global communicator size.");
41 }
42 wsize=gsize/swcount; // each world has this many processes
43 MPI_Comm sub;
44 int res=MPI_Comm_split(MPI_COMM_WORLD, grank/wsize, grank%wsize, &sub);
45 if (res!=MPI_SUCCESS)
46 {
47 throw SplitWorldException("SplitWorld error: Unable to form communicator.");
48 }
49 subcom=esysUtils::makeInfo(sub,true);
50 #else
51 subcom=esysUtils::makeInfo(0);
52 #endif
53 localworld=SubWorld_ptr(new SubWorld(subcom));
54 localid=grank/wsize;
55 }
56
57 SplitWorld::~SplitWorld()
58 {
59 // communicator cleanup handled by the MPI_Info
60 }
61
62
63 // The boost wrapper will ensure that there is at least one entry in the tuple
64 object SplitWorld::buildDomains(tuple t, dict kwargs)
65 {
66 int tsize=len(t);
67 // get the callable that we will invoke in a sec
68 object tocall=t[0];
69 // make a new tuple without the first element
70 tuple ntup=tuple(t.slice(1,tsize));
71 // now add the subworld to the kwargs
72 kwargs["escriptworld"]=localworld;
73
74 // pass the whole package to the python call
75 object dobj=tocall(*ntup, **kwargs);
76 extract<Domain_ptr> ex1(dobj);
77 Domain_ptr dptr=ex1();
78
79 // now do a sanity check to see if the domain has respected the communicator info we passed it.
80 if (dptr->getMPIComm()!=localworld->getMPI()->comm)
81 {
82 throw SplitWorldException("The newly constructed domain is not using the correct communicator.");
83 }
84 localworld->setDomain(dptr);
85 return object(); // return None
86 }
87
88
89 namespace
90 {
91
92 // Throw all values in and get the maximum
93 // This is not an AllReduce because we need to use Tagged Communication so as not to interfere with
94 // other messages / collective ops which the SubWorld's Jobs might be using
95 // This is implemented by sending to rank 0
96 bool checkResultInt(int res, int& mres, esysUtils::JMPI& info)
97 {
98 #ifdef ESYS_MPI
99 const int leader=0;
100 const int BIGTAG=esysUtils::getSubWorldTag();
101 if (info->size==1)
102 {
103 mres=res;
104 return true;
105 }
106 else
107 {
108 if (info->rank!=leader)
109 {
110 if (MPI_Send(&res, 1, MPI_INT, leader, BIGTAG, info->comm)!=MPI_SUCCESS)
111 {
112 return false;
113 }
114 if (MPI_Recv(&mres, 1, MPI_INT, leader, BIGTAG, info->comm,0)!=MPI_SUCCESS)
115 {
116 return false;
117 }
118 }
119 else
120 {
121 MPI_Request* reqs=new MPI_Request[info->size-1];
122 int* eres=new int[info->size-1];
123 for (int i=0;i<info->size-1;++i)
124 {
125 MPI_Irecv(eres+i, 1, MPI_INT, i+1, BIGTAG, info->comm, reqs+i);
126 }
127 if (MPI_Waitall(info->size-1, reqs, 0)!=MPI_SUCCESS)
128 {
129 delete[] reqs;
130 return false;
131 }
132 // now we have them all, find the max
133 mres=res;
134 for (int i=0;i<info->size-1;++i)
135 {
136 if (mres<eres[i])
137 {
138 mres=eres[i];
139 }
140 }
141 // now we know what the result should be
142 // send it to the others
143 for (int i=0;i<info->size-1;++i)
144 {
145 MPI_Isend(&mres, 1, MPI_INT, i+1, BIGTAG, info->comm, reqs+i);
146 }
147 if (MPI_Waitall(info->size-1, reqs,0)!=MPI_SUCCESS)
148 {
149 return false;
150 }
151 }
152
153 }
154 return true;
155 #else
156 mres=res;
157 return true;
158 #endif
159 }
160
161
162 }
163
164 // Executes all pending jobs on all subworlds
165 void SplitWorld::runJobs()
166 {
167 esysUtils::NoCOMM_WORLD ncw; // it's destructor will unset the flag
168 distributeJobs();
169 int mres=0;
170 std::string err;
171 std::vector<char> impexpdetail;
172 do
173 {
174 // now we actually need to run the jobs
175 // everybody will be executing their localworld's jobs
176 int res=localworld->runJobs(err);
177
178 // take this opportunity to clean up
179 localworld->clearImportExports();
180 // now we find out about the other worlds
181 if (!checkResultInt(res, mres, globalcom))
182 {
183 throw SplitWorldException("MPI appears to have failed.");
184 }
185 if (mres>1) // 1 and 0 are normal returns, >1 is some sort of error
186 {
187 break;
188 }
189 if (!localworld->localTransport(impexpdetail, err))
190 {
191 mres=4;
192 break;
193 }
194 } while (mres==1);
195 if (mres==0)
196 {
197 return;
198 }
199 else if (mres==2)
200 {
201 throw SplitWorldException("At least one Job's work() function did not return True/False.");
202 }
203 else if (mres==3)
204 {
205 char* resultstr=0;
206 // now we ship around the error message - This should be safe since
207 // eveyone must have finished their Jobs to get here
208 if (!esysUtils::shipString(err.c_str(), &resultstr, globalcom->comm))
209 {
210 throw SplitWorldException("MPI appears to have failed.");
211 }
212 //throw SplitWorldException("At least one Job's work() function raised an exception.");
213 std::string s("At least one Job's work() function raised the following exception:\n");
214 s+=resultstr;
215 throw SplitWorldException(s);
216 }
217 else if (mres==4)
218 {
219 throw SplitWorldException("While processing exports: "+err);
220
221 }
222 else
223 {
224 throw SplitWorldException("Unexpected return value from runJobs.");
225 }
226 }
227
228 /**
229 stores the constructor/factory to make Jobs and the parameters.
230 */
231 void SplitWorld::addJob(boost::python::object creator, boost::python::tuple tup, boost::python::dict kw)
232 {
233 create.push_back(creator);
234 tupargs.push_back(tup);
235 kwargs.push_back(kw);
236 }
237
238 // At some point, we may need there to be more isolation here
239 // and trap any python exceptions etc, but for now I'll just call the constructor
240 void SplitWorld::addVariable(std::string name, boost::python::object creator, boost::python::tuple ntup, boost::python::dict kwargs)
241 {
242 object red=creator(*ntup, **kwargs);
243 extract<Reducer_ptr> ex(red);
244 if (!ex.check())
245 {
246 throw SplitWorldException("Creator function did not produce a reducer.");
247 }
248 Reducer_ptr rp=ex();
249 localworld->addVariable(name, rp, manualimport);
250 }
251
252
253 void SplitWorld::removeVariable(std::string name)
254 {
255 localworld->removeVariable(name);
256 }
257
258 void SplitWorld::clearPendingJobs()
259 {
260 create.clear();
261 tupargs.clear();
262 kwargs.clear();
263 }
264
265 void SplitWorld::clearActiveJobs()
266 {
267 localworld->clearJobs();
268 }
269
270 // All the job params are known on all the ranks.
271 void SplitWorld::distributeJobs()
272 {
273 unsigned int numjobs=create.size()/swcount;
274 unsigned int start=create.size()/swcount*localid;
275 if (localid<create.size()%swcount)
276 {
277 numjobs++;
278 start+=localid;
279 }
280 else
281 {
282 start+=create.size()%swcount;
283 }
284 int errstat=0;
285 try
286 {
287 // No other subworld will be looking at this portion of the array
288 // so jobs will only be created on one subworld
289 for (unsigned int i=start;i<start+numjobs;++i)
290 {
291 // we need to add some things to the kw map
292 kwargs[i]["domain"]=localworld->getDomain();
293 kwargs[i]["jobid"]=object(jobcounter+i);
294 object job=create[i](*(tupargs[i]), **(kwargs[i]));
295 localworld->addJob(job);
296 }
297 }
298 catch (boost::python::error_already_set e)
299 {
300 errstat=1;
301 }
302 jobcounter+=create.size();
303 clearPendingJobs();
304
305 // MPI check to ensure that it worked for everybody
306 int mstat=0;
307 if (!esysUtils::checkResult(errstat, mstat, globalcom->comm))
308 {
309 throw SplitWorldException("MPI appears to have failed.");
310 }
311
312 if (errstat==1)
313 {
314 throw SplitWorldException("distributeJobs: Job creation failed.");
315 clearActiveJobs();
316 }
317 }
318
319
320 namespace escript
321 {
322
323 boost::python::object raw_buildDomains(boost::python::tuple t, boost::python::dict kwargs)
324 {
325 int l=len(t);
326 if (l<2)
327 {
328 throw SplitWorldException("Insufficient parameters to buildDomains.");
329 }
330 extract<SplitWorld&> exw(t[0]);
331 if (!exw.check())
332 {
333 throw SplitWorldException("First parameter to buildDomains must be a SplitWorld.");
334 }
335 SplitWorld& ws=exw();
336 tuple ntup=tuple(t.slice(1,l)); // strip off the object param
337 return ws.buildDomains(ntup, kwargs);
338 }
339
340 boost::python::object raw_addJob(boost::python::tuple t, boost::python::dict kwargs)
341 {
342 int l=len(t);
343 if (l<2)
344 {
345 throw SplitWorldException("Insufficient parameters to addJob.");
346 }
347 extract<SplitWorld&> exw(t[0]);
348 if (!exw.check())
349 {
350 throw SplitWorldException("First parameter to addJob must be a SplitWorld.");
351 }
352 SplitWorld& ws=exw();
353 object creator=t[1];
354 tuple ntup=tuple(t.slice(2,l)); // strip off the object param
355 ws.addJob(creator, ntup, kwargs);
356 return object();
357 }
358
359 // expects, splitworld, name of var, constructor function for the reducer, any constructor params
360 boost::python::object raw_addVariable(boost::python::tuple t, boost::python::dict kwargs)
361 {
362 int l=len(t);
363 if (l<3)
364 {
365 throw SplitWorldException("Insufficient parameters to addReducer.");
366 }
367 extract<SplitWorld&> exw(t[0]);
368 if (!exw.check())
369 {
370 throw SplitWorldException("First parameter to addVariable must be a SplitWorld.");
371 }
372 SplitWorld& ws=exw();
373 object pname=t[1];
374 extract<std::string> ex2(pname);
375 if (!ex2.check())
376 {
377 throw SplitWorldException("Second parameter to addVariable must be a string");
378 }
379 std::string name=ex2();
380 object creator=t[2];
381 tuple ntup=tuple(t.slice(3,l)); // strip off the object param
382 ws.addVariable(name, creator, ntup, kwargs);
383 return object();
384 }
385
386
387
388 }

  ViewVC Help
Powered by ViewVC 1.1.26