/[escript]/branches/subworld2/escriptcore/src/SplitWorld.cpp
ViewVC logotype

Contents of /branches/subworld2/escriptcore/src/SplitWorld.cpp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 5505 - (show annotations)
Wed Mar 4 23:05:47 2015 UTC (3 years, 11 months ago) by jfenwick
File size: 11123 byte(s)
Changes which were sitting in my trunk dir
1 /*****************************************************************************
2 *
3 * Copyright (c) 2014-2015 by University of Queensland
4 * http://www.uq.edu.au
5 *
6 * Primary Business: Queensland, Australia
7 * Licensed under the Open Software License version 3.0
8 * http://www.opensource.org/licenses/osl-3.0.php
9 *
10 * Development until 2012 by Earth Systems Science Computational Center (ESSCC)
11 * Development 2012-2013 by School of Earth Sciences
12 * Development from 2014 by Centre for Geoscience Computing (GeoComp)
13 *
14 *****************************************************************************/
15
16 #define ESNEEDPYTHON
17 #include "esysUtils/first.h"
18
19
20 #include "esysUtils/Esys_MPI.h"
21 #include "SplitWorld.h"
22 #include "AbstractDomain.h"
23 #include "SplitWorldException.h"
24 #include "SplitWorldException.h"
25 #include "esysUtils/pyerr.h"
26
27 #include <iostream>
28 #include <sstream>
29
30 using namespace boost::python;
31 using namespace escript;
32 namespace rs=escript::reducerstatus;
33
34
35 double SplitWorld::getScalarVariable(const std::string& name)
36 {
37 // do we have a variable of that name?
38 return localworld->getScalarVariable(name);
39 }
40
41 SplitWorld::SplitWorld(unsigned int numgroups, MPI_Comm global)
42 :localworld((SubWorld*)0), swcount(numgroups>0?numgroups:1), jobcounter(1), manualimport(false)
43 {
44 globalcom=esysUtils::makeInfo(global);
45
46 int grank=0;
47 int wsize=1; // each world has this many processes
48 esysUtils::JMPI subcom; // communicator linking other processes in this subworld
49 esysUtils::JMPI corrcom; // communicator linking corresponding processes in different subworlds
50 #ifdef ESYS_MPI
51 int gsize=globalcom->size;
52 grank=globalcom->rank;
53 if (gsize%swcount!=0)
54 {
55 throw SplitWorldException("SplitWorld error: requested number of groups is not a factor of global communicator size.");
56 }
57 wsize=gsize/swcount; // each world has this many processes
58 MPI_Comm sub;
59 int res=MPI_Comm_split(global, grank/wsize, grank%wsize, &sub);
60 if (res!=MPI_SUCCESS)
61 {
62 throw SplitWorldException("SplitWorld error: Unable to form communicator.");
63 }
64 subcom=esysUtils::makeInfo(sub,true);
65
66
67 MPI_Comm corrsub;
68 res=MPI_Comm_split(global, grank%wsize, grank/wsize, &corrsub);
69 if (res!=MPI_SUCCESS)
70 {
71 throw SplitWorldException("SplitWorld error: Unable to form communicator.");
72 }
73 corrcom=esysUtils::makeInfo(corrsub,true);
74
75 #else
76 subcom=esysUtils::makeInfo(0);
77 corrcom=esysUtils::makeInfo(0);
78 #endif
79 localworld=SubWorld_ptr(new SubWorld(globalcom, subcom,corrcom, swcount, grank%wsize,manualimport));
80 localid=grank/wsize;
81 }
82
83 SplitWorld::~SplitWorld()
84 {
85 // communicator cleanup handled by the MPI_Info
86 }
87
88
89 // The boost wrapper will ensure that there is at least one entry in the tuple
90 object SplitWorld::buildDomains(tuple t, dict kwargs)
91 {
92 int tsize=len(t);
93 // get the callable that we will invoke in a sec
94 object tocall=t[0];
95 // make a new tuple without the first element
96 tuple ntup=tuple(t.slice(1,tsize));
97 // now add the subworld to the kwargs
98 kwargs["escriptworld"]=localworld;
99
100 // pass the whole package to the python call
101 object dobj=tocall(*ntup, **kwargs);
102 extract<Domain_ptr> ex1(dobj);
103 Domain_ptr dptr=ex1();
104
105 // now do a sanity check to see if the domain has respected the communicator info we passed it.
106 if (dptr->getMPIComm()!=localworld->getMPI()->comm)
107 {
108 throw SplitWorldException("The newly constructed domain is not using the correct communicator.");
109 }
110 localworld->setDomain(dptr);
111 return object(); // return None
112 }
113
114
115 // Executes all pending jobs on all subworlds
116 void SplitWorld::runJobs()
117 {
118 esysUtils::NoCOMM_WORLD ncw; // it's destructor will unset the flag
119 localworld->resetInterest();
120 try
121 {
122 distributeJobs();
123 int mres=0;
124 std::string err;
125 do // only here so I can "break" to the end
126 {
127 // find out which variables each world has and wants (global op)
128 if (!localworld->synchVariableInfo(err))
129 {
130 mres=4;
131 break;
132 }
133 // distribute values to worlds as needed (global op)
134 if (!localworld->synchVariableValues(err))
135 {
136 mres=4;
137 break;
138 }
139 // give values to jobs (local op)
140 if (!localworld->deliverImports(err))
141 {
142 mres=4; // can't jump to the end because this is a local op
143 }
144 else // import delivery was successful
145 {
146 // now we actually need to run the jobs
147 // everybody will be executing their localworld's jobs
148 mres=localworld->runJobs(err);
149
150 if (mres<2)
151 {
152 if (!localworld->localTransport(err))
153 {
154 mres=4; // both running jobs and local reduction are local ops
155 }
156 }
157 }
158
159 } while (false);
160 int res=mres;
161 // now we find out about the other worlds
162 if (!esysUtils::checkResult(res, mres, globalcom))
163 {
164 throw SplitWorldException("MPI appears to have failed.");
165 }
166
167 localworld->clearJobs();
168 // at this point, the remote world has all the reductions done
169 // now we need to do the global merges
170 if (!localworld->checkRemoteCompatibility(err))
171 {
172 mres=4;
173 err="Error in checkRemoteCompatibility.";
174 }
175 if (mres==0)
176 {
177 return;
178 }
179 else if (mres==2)
180 {
181 throw SplitWorldException("At least one Job's work() function did not return True/False.");
182 }
183 else if (mres==3)
184 {
185 char* resultstr=0;
186 // now we ship around the error message - This should be safe since
187 // eveyone must have finished their Jobs to get here
188 if (!esysUtils::shipString(err.c_str(), &resultstr, globalcom->comm))
189 {
190 throw SplitWorldException("MPI appears to have failed.");
191 }
192 //throw SplitWorldException("At least one Job's work() function raised an exception.");
193 std::string s("At least one Job's work() function raised the following exception:\n");
194 s+=resultstr;
195 throw SplitWorldException(s);
196 }
197 else if (mres==4)
198 {
199 throw SplitWorldException("While processing exports: "+err);
200 }
201 else
202 {
203 throw SplitWorldException("Unexpected return value from runJobs.");
204 }
205
206 }
207 catch (SplitWorldException e )
208 {
209 clearAllJobs();
210 throw e;
211 }
212 }
213
214 /**
215 stores the constructor/factory to make Jobs and the parameters.
216 */
217 void SplitWorld::addJob(boost::python::object creator, boost::python::tuple tup, boost::python::dict kw)
218 {
219 create.push_back(creator);
220 tupargs.push_back(tup);
221 kwargs.push_back(kw);
222 }
223
224 // At some point, we may need there to be more isolation here
225 // and trap any python exceptions etc, but for now I'll just call the constructor
226 void SplitWorld::addVariable(std::string name, boost::python::object creator, boost::python::tuple ntup, boost::python::dict kwargs)
227 {
228 object red=creator(*ntup, **kwargs);
229 extract<Reducer_ptr> ex(red);
230 if (!ex.check())
231 {
232 throw SplitWorldException("Creator function did not produce a reducer.");
233 }
234 Reducer_ptr rp=ex();
235 localworld->addVariable(name, rp);
236 }
237
238
239 void SplitWorld::removeVariable(std::string name)
240 {
241 localworld->removeVariable(name);
242 }
243
244
245 void SplitWorld::clearVariable(std::string name)
246 {
247 localworld->clearVariable(name);
248 }
249
250
251
252 void SplitWorld::clearAllJobs()
253 {
254 clearPendingJobs();
255 localworld->clearJobs();
256 }
257
258 void SplitWorld::clearPendingJobs()
259 {
260 create.clear();
261 tupargs.clear();
262 kwargs.clear();
263 }
264
265 // All the job params are known on all the ranks.
266 void SplitWorld::distributeJobs()
267 {
268 std::string errmsg;
269 unsigned int numjobs=create.size()/swcount;
270 unsigned int start=create.size()/swcount*localid;
271 if (localid<create.size()%swcount)
272 {
273 numjobs++;
274 start+=localid;
275 }
276 else
277 {
278 start+=create.size()%swcount;
279 }
280 int errstat=0;
281 try
282 {
283 // No other subworld will be looking at this portion of the array
284 // so jobs will only be created on one subworld
285 for (unsigned int i=start;i<start+numjobs;++i)
286 {
287 // we need to add some things to the kw map
288 kwargs[i]["domain"]=localworld->getDomain();
289 kwargs[i]["jobid"]=object(jobcounter+i);
290 object job=create[i](*(tupargs[i]), **(kwargs[i]));
291 localworld->addJob(job);
292 }
293 }
294 catch (boost::python::error_already_set e)
295 {
296 errstat=1;
297 getStringFromPyException(e, errmsg);
298 }
299 jobcounter+=create.size();
300 clearPendingJobs();
301
302 // MPI check to ensure that it worked for everybody
303 int mstat=0;
304 if (!esysUtils::checkResult(errstat, mstat, globalcom))
305 {
306 throw SplitWorldException("MPI appears to have failed.");
307 }
308
309 // Now we need to find out if anyone else had an error
310 if (!esysUtils::checkResult(errstat, mstat, globalcom))
311 {
312 throw SplitWorldException("MPI appears to have failed.");
313 }
314 errstat=mstat;
315
316 if (errstat==1)
317 {
318 char* resultstr=0;
319 // now we ship around the error message - This should be safe since
320 // eveyone must have finished their Jobs to get here
321 if (!esysUtils::shipString(errmsg.c_str(), &resultstr, globalcom->comm))
322 {
323 throw SplitWorldException("MPI appears to have failed.");
324 }
325 throw SplitWorldException(std::string("(During Job creation/distribution) ")+resultstr);
326 }
327 }
328
329 unsigned int SplitWorld::getWorldCount()
330 {
331 return swcount;
332 }
333
334
335 namespace escript
336 {
337
338 boost::python::object raw_buildDomains(boost::python::tuple t, boost::python::dict kwargs)
339 {
340 int l=len(t);
341 if (l<2)
342 {
343 throw SplitWorldException("Insufficient parameters to buildDomains.");
344 }
345 extract<SplitWorld&> exw(t[0]);
346 if (!exw.check())
347 {
348 throw SplitWorldException("First parameter to buildDomains must be a SplitWorld.");
349 }
350 SplitWorld& ws=exw();
351 tuple ntup=tuple(t.slice(1,l)); // strip off the object param
352 return ws.buildDomains(ntup, kwargs);
353 }
354
355 boost::python::object raw_addJob(boost::python::tuple t, boost::python::dict kwargs)
356 {
357 int l=len(t);
358 if (l<2)
359 {
360 throw SplitWorldException("Insufficient parameters to addJob.");
361 }
362 extract<SplitWorld&> exw(t[0]);
363 if (!exw.check())
364 {
365 throw SplitWorldException("First parameter to addJob must be a SplitWorld.");
366 }
367 SplitWorld& ws=exw();
368 object creator=t[1];
369 tuple ntup=tuple(t.slice(2,l)); // strip off the object param
370 ws.addJob(creator, ntup, kwargs);
371 return object();
372 }
373
374 // expects, splitworld, name of var, constructor function for the reducer, any constructor params
375 boost::python::object raw_addVariable(boost::python::tuple t, boost::python::dict kwargs)
376 {
377 int l=len(t);
378 if (l<3)
379 {
380 throw SplitWorldException("Insufficient parameters to addReducer.");
381 }
382 extract<SplitWorld&> exw(t[0]);
383 if (!exw.check())
384 {
385 throw SplitWorldException("First parameter to addVariable must be a SplitWorld.");
386 }
387 SplitWorld& ws=exw();
388 object pname=t[1];
389 extract<std::string> ex2(pname);
390 if (!ex2.check())
391 {
392 throw SplitWorldException("Second parameter to addVariable must be a string");
393 }
394 std::string name=ex2();
395 object creator=t[2];
396 tuple ntup=tuple(t.slice(3,l)); // strip off the object param
397 ws.addVariable(name, creator, ntup, kwargs);
398 return object();
399 }
400
401 }

  ViewVC Help
Powered by ViewVC 1.1.26