/[escript]/branches/split/escriptcore/src/SplitWorld.cpp
ViewVC logotype

Contents of /branches/split/escriptcore/src/SplitWorld.cpp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 4774 - (show annotations)
Wed Mar 19 06:45:39 2014 UTC (5 years, 5 months ago) by jfenwick
File size: 8203 byte(s)
Runs a mixture of jobs with no exports
1 /*****************************************************************************
2 *
3 * Copyright (c) 2014 by University of Queensland
4 * http://www.uq.edu.au
5 *
6 * Primary Business: Queensland, Australia
7 * Licensed under the Open Software License version 3.0
8 * http://www.opensource.org/licenses/osl-3.0.php
9 *
10 * Development until 2012 by Earth Systems Science Computational Center (ESSCC)
11 * Development 2012-2013 by School of Earth Sciences
12 * Development from 2014 by Centre for Geoscience Computing (GeoComp)
13 *
14 *****************************************************************************/
15
16 #include "esysUtils/Esys_MPI.h"
17 #include "SplitWorld.h"
18 #include "AbstractDomain.h"
19 #include "SplitWorldException.h"
20 #include "SplitWorldException.h"
21
22 #include <iostream>
23
24 using namespace boost::python;
25 using namespace escript;
26
27 SplitWorld::SplitWorld(unsigned int numgroups, MPI_Comm global)
28 :localworld((SubWorld*)0), swcount(numgroups>0?numgroups:1), jobcounter(1)
29 {
30 globalcom=esysUtils::makeInfo(global);
31
32 int grank=0;
33 int wsize=1; // each world has this many processes
34 #ifdef ESYS_MPI
35 int gsize=globalcom->size;
36 grank=globalcom->rank;
37 if (gsize%swcount!=0)
38 {
39 throw SplitWorldException("SplitWorld error: requested number of groups is not a factor of global communicator size.");
40 }
41 wsize=gsize/swcount; // each world has this many processes
42 MPI_Comm sub;
43 int res=MPI_Comm_split(MPI_COMM_WORLD, grank/wsize, grank%wsize, &sub);
44 if (res!=MPI_SUCCESS)
45 {
46 throw SplitWorldException("SplitWorld error: Unable to form communicator.");
47 }
48 subcom=esysUtils::makeInfo(sub,true);
49 #else
50 subcom=esysUtils::makeInfo(0);
51 #endif
52 localworld=SubWorld_ptr(new SubWorld(subcom));
53 localid=grank/wsize;
54 }
55
56 SplitWorld::~SplitWorld()
57 {
58 // communicator cleanup handled by the MPI_Info
59 }
60
61
62 // The boost wrapper will ensure that there is at least one entry in the tuple
63 object SplitWorld::buildDomains(tuple t, dict kwargs)
64 {
65 int tsize=len(t);
66 // get the callable that we will invoke in a sec
67 object tocall=t[0];
68 // make a new tuple without the first element
69 tuple ntup=tuple(t.slice(1,tsize));
70 // now add the subworld to the kwargs
71 kwargs["escriptworld"]=localworld;
72
73 // pass the whole package to the python call
74 object dobj=tocall(*ntup, **kwargs);
75 extract<Domain_ptr> ex1(dobj);
76 Domain_ptr dptr=ex1();
77
78 // now do a sanity check to see if the domain has respected the communicator info we passed it.
79 if (dptr->getMPIComm()!=localworld->getMPI()->comm)
80 {
81 throw SplitWorldException("The newly constructed domain is not using the correct communicator.");
82 }
83 localworld->setDomain(dptr);
84 return object(); // return None
85 }
86
87
88 namespace
89 {
90
91 // Throw all values in and get the maximum
92 // This is not an AllReduce because we need to use Tagged Communication so as not to interfere with
93 // other messages / collective ops which the SubWorld's Jobs might be using
94 // This is implemented by sending to rank 0
95 bool checkResultInt(int res, int& mres, esysUtils::JMPI& info)
96 {
97 const int leader=0;
98 const int BIGTAG=esysUtils::getSubWorldTag();
99 if (info->size==1)
100 {
101 mres=res;
102 return true;
103 }
104 else
105 {
106 if (info->rank!=leader)
107 {
108 if (MPI_Send(&res, 1, MPI_INT, leader, BIGTAG, info->comm)!=MPI_SUCCESS)
109 {
110 return false;
111 }
112 if (MPI_Recv(&mres, 1, MPI_INT, leader, BIGTAG, info->comm,0)!=MPI_SUCCESS)
113 {
114 return false;
115 }
116 }
117 else
118 {
119 MPI_Request* reqs=new MPI_Request[info->size-1];
120 int* eres=new int[info->size-1];
121 for (int i=0;i<info->size-1;++i)
122 {
123 MPI_Irecv(eres+i, 1, MPI_INT, i+1, BIGTAG, info->comm, reqs+i);
124 }
125 if (MPI_Waitall(info->size-1, reqs, 0)!=MPI_SUCCESS)
126 {
127 delete[] reqs;
128 return false;
129 }
130 // now we have them all, find the max
131 mres=res;
132 for (int i=0;i<info->size-1;++i)
133 {
134 if (mres<eres[i])
135 {
136 mres=eres[i];
137 }
138 }
139 // now we know what the result should be
140 // send it to the others
141 for (int i=0;i<info->size-1;++i)
142 {
143 MPI_Isend(&mres, 1, MPI_INT, i+1, BIGTAG, info->comm, reqs+i);
144 }
145 if (MPI_Waitall(info->size-1, reqs,0)!=MPI_SUCCESS)
146 {
147 return false;
148 }
149 }
150
151 }
152 return true;
153 }
154
155
156 }
157
158 // Executes all pending jobs on all subworlds
159 void SplitWorld::runJobs()
160 {
161 distributeJobs();
162 int mres=0;
163 std::string err;
164 do
165 {
166 // now we actually need to run the jobs
167 // everybody will be executing their localworld's jobs
168 int res=localworld->runJobs(err);
169 // now we find out about the other worlds
170 if (!checkResultInt(res, mres, globalcom))
171 {
172 throw SplitWorldException("MPI appears to have failed.");
173 }
174 } while (mres==1);
175 if (mres==2)
176 {
177 throw SplitWorldException("At least one Job's work() function did not return True/False.");
178 }
179 else if (mres==3)
180 {
181 char* resultstr=0;
182 // now we ship around the error message - This should be safe since
183 // eveyone must have finished their Jobs to get here
184 if (!esysUtils::shipString(err.c_str(), &resultstr, globalcom->comm))
185 {
186 throw SplitWorldException("MPI appears to have failed.");
187 }
188 //throw SplitWorldException("At least one Job's work() function raised an exception.");
189 std::string s("At least one Job's work() function raised the following exception:\n");
190 s+=resultstr;
191 throw SplitWorldException(s);
192 }
193 }
194
195 void SplitWorld::registerCrate(escript::crate_ptr c)
196 {
197 protocrates.push_back(c);
198 }
199
200 /**
201 stores the constructor/factory to make Jobs and the parameters.
202 */
203 void SplitWorld::addJob(boost::python::object creator, boost::python::tuple tup, boost::python::dict kw)
204 {
205 create.push_back(creator);
206 tupargs.push_back(tup);
207 kwargs.push_back(kw);
208 }
209
210 void SplitWorld::clearPendingJobs()
211 {
212 create.clear();
213 tupargs.clear();
214 kwargs.clear();
215 }
216
217 void SplitWorld::clearActiveJobs()
218 {
219 localworld->clearJobs();
220 }
221
222 // All the job params are known on all the ranks.
223 void SplitWorld::distributeJobs()
224 {
225 unsigned int numjobs=create.size()/swcount;
226 unsigned int start=create.size()/swcount*localid;
227 if (localid<create.size()%swcount)
228 {
229 numjobs++;
230 start+=localid;
231 }
232 else
233 {
234 start+=create.size()%swcount;
235 }
236 int errstat=0;
237 try
238 {
239 // No other subworld will be looking at this portion of the array
240 // so jobs will only be created on one subworld
241 for (unsigned int i=start;i<start+numjobs;++i)
242 {
243 // we need to add some things to the kw map
244 kwargs[i]["domain"]=localworld->getDomain();
245 kwargs[i]["jobid"]=object(jobcounter+i);
246 object job=create[i](*(tupargs[i]), **(kwargs[i]));
247 localworld->addJob(job);
248 }
249 }
250 catch (boost::python::error_already_set e)
251 {
252 errstat=1;
253 }
254 jobcounter+=create.size();
255 clearPendingJobs();
256
257 // MPI check to ensure that it worked for everybody
258 int mstat=0;
259 if (!esysUtils::checkResult(errstat, mstat, globalcom->comm))
260 {
261 throw SplitWorldException("MPI appears to have failed.");
262 }
263
264 if (errstat==1)
265 {
266 throw SplitWorldException("distributeJobs: Job creation failed.");
267 clearActiveJobs();
268 }
269 }
270
271
272 namespace escript
273 {
274
275 boost::python::object raw_buildDomains(boost::python::tuple t, boost::python::dict kwargs)
276 {
277 int l=len(t);
278 if (l<2)
279 {
280 throw SplitWorldException("Insufficient parameters to buildDomains.");
281 }
282 extract<SplitWorld&> exw(t[0]);
283 if (!exw.check())
284 {
285 throw SplitWorldException("First parameter to buildDomains must be a SplitWorld.");
286 }
287 SplitWorld& ws=exw();
288 tuple ntup=tuple(t.slice(1,l)); // strip off the object param
289 return ws.buildDomains(ntup, kwargs);
290 }
291
292 boost::python::object raw_addJob(boost::python::tuple t, boost::python::dict kwargs)
293 {
294 int l=len(t);
295 if (l<2)
296 {
297 throw SplitWorldException("Insufficient parameters to addJob.");
298 }
299 extract<SplitWorld&> exw(t[0]);
300 if (!exw.check())
301 {
302 throw SplitWorldException("First parameter to addJob must be a SplitWorld.");
303 }
304 SplitWorld& ws=exw();
305 object creator=t[1];
306 tuple ntup=tuple(t.slice(2,l)); // strip off the object param
307 ws.addJob(creator, ntup, kwargs);
308 return object();
309 }
310
311
312 }

  ViewVC Help
Powered by ViewVC 1.1.26