/[escript]/branches/split/escriptcore/src/SplitWorld.cpp
ViewVC logotype

Contents of /branches/split/escriptcore/src/SplitWorld.cpp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 4773 - (show annotations)
Wed Mar 19 06:19:42 2014 UTC (3 years, 9 months ago) by jfenwick
File size: 8651 byte(s)
Fixing Data to use the domain's communicator.
Like it should!


1 /*****************************************************************************
2 *
3 * Copyright (c) 2014 by University of Queensland
4 * http://www.uq.edu.au
5 *
6 * Primary Business: Queensland, Australia
7 * Licensed under the Open Software License version 3.0
8 * http://www.opensource.org/licenses/osl-3.0.php
9 *
10 * Development until 2012 by Earth Systems Science Computational Center (ESSCC)
11 * Development 2012-2013 by School of Earth Sciences
12 * Development from 2014 by Centre for Geoscience Computing (GeoComp)
13 *
14 *****************************************************************************/
15
16 #include "esysUtils/Esys_MPI.h"
17 #include "SplitWorld.h"
18 #include "AbstractDomain.h"
19 #include "SplitWorldException.h"
20 #include "SplitWorldException.h"
21
22 #include <iostream>
23
24 using namespace boost::python;
25 using namespace escript;
26
27 SplitWorld::SplitWorld(unsigned int numgroups, MPI_Comm global)
28 :localworld((SubWorld*)0), swcount(numgroups>0?numgroups:1), jobcounter(1)
29 {
30 globalcom=esysUtils::makeInfo(global);
31
32 int grank=0;
33 int wsize=1; // each world has this many processes
34 #ifdef ESYS_MPI
35 int gsize=globalcom->size;
36 grank=globalcom->rank;
37 if (gsize%swcount!=0)
38 {
39 throw SplitWorldException("SplitWorld error: requested number of groups is not a factor of global communicator size.");
40 }
41 wsize=gsize/swcount; // each world has this many processes
42 MPI_Comm sub;
43 int res=MPI_Comm_split(MPI_COMM_WORLD, grank/wsize, grank%wsize, &sub);
44 if (res!=MPI_SUCCESS)
45 {
46 throw SplitWorldException("SplitWorld error: Unable to form communicator.");
47 }
48 subcom=esysUtils::makeInfo(sub,true);
49 #else
50 subcom=esysUtils::makeInfo(0);
51 #endif
52 localworld=SubWorld_ptr(new SubWorld(subcom));
53 localid=grank/wsize;
54 }
55
56 SplitWorld::~SplitWorld()
57 {
58 // communicator cleanup handled by the MPI_Info
59 }
60
61
62 // The boost wrapper will ensure that there is at least one entry in the tuple
63 object SplitWorld::buildDomains(tuple t, dict kwargs)
64 {
65 int tsize=len(t);
66 // get the callable that we will invoke in a sec
67 object tocall=t[0];
68 // make a new tuple without the first element
69 tuple ntup=tuple(t.slice(1,tsize));
70 // now add the subworld to the kwargs
71 kwargs["escriptworld"]=localworld;
72
73 // std::cerr << "About to call function with:\n";
74 // //extract<std::string> ex(ntup.attr("__str__")());
75 // std::cerr << extract<std::string>(ntup.attr("__str__")())() << std::endl;
76 // // for (int i=0;i<tsize-1;++i)
77 // // {
78 // // std::cout << extract<const char*>(ntup[i])() << " ";
79 // // }
80 // std::cerr << std::endl;
81
82 // pass the whole package to the python call
83 object dobj=tocall(*ntup, **kwargs);
84 extract<Domain_ptr> ex1(dobj);
85 Domain_ptr dptr=ex1();
86
87 // now do a sanity check to see if the domain has respected the communicator info we passed it.
88 if (dptr->getMPIComm()!=localworld->getMPI()->comm)
89 {
90 throw SplitWorldException("The newly constructed domain is not using the correct communicator.");
91 }
92 localworld->setDomain(dptr);
93 return object(); // return None
94 }
95
96
97 namespace
98 {
99
100 // Throw all values in and get the maximum
101 // This is not an AllReduce because we need to use Tagged Communication so as not to interfere with
102 // other messages / collective ops which the SubWorld's Jobs might be using
103 // This is implemented by sending to rank 0
104 bool checkResultInt(int res, int& mres, esysUtils::JMPI& info)
105 {
106 const int leader=0;
107 const int BIGTAG=esysUtils::getSubWorldTag();
108 if (info->size==1)
109 {
110 mres=res;
111 return true;
112 }
113 else
114 {
115 if (info->rank!=leader)
116 {
117 if (MPI_Send(&res, 1, MPI_INT, leader, BIGTAG, info->comm)!=MPI_SUCCESS)
118 {
119 return false;
120 }
121 if (MPI_Recv(&mres, 1, MPI_INT, leader, BIGTAG, info->comm,0)!=MPI_SUCCESS)
122 {
123 return false;
124 }
125 }
126 else
127 {
128 MPI_Request* reqs=new MPI_Request[info->size-1];
129 int* eres=new int[info->size-1];
130 for (int i=0;i<info->size-1;++i)
131 {
132 MPI_Irecv(eres+i, 1, MPI_INT, i+1, BIGTAG, info->comm, reqs+i);
133 }
134 if (MPI_Waitall(info->size-1, reqs, 0)!=MPI_SUCCESS)
135 {
136 delete[] reqs;
137 return false;
138 }
139 // now we have them all, find the max
140 mres=res;
141 for (int i=0;i<info->size-1;++i)
142 {
143 if (mres<eres[i])
144 {
145 mres=eres[i];
146 }
147 }
148 // now we know what the result should be
149 // send it to the others
150 for (int i=0;i<info->size-1;++i)
151 {
152 MPI_Isend(&mres, 1, MPI_INT, i+1, BIGTAG, info->comm, reqs+i);
153 }
154 if (MPI_Waitall(info->size-1, reqs,0)!=MPI_SUCCESS)
155 {
156 return false;
157 }
158 }
159
160 }
161 return true;
162 }
163
164
165 }
166
167 // Executes all pending jobs on all subworlds
168 void SplitWorld::runJobs()
169 {
170 distributeJobs();
171 int mres=0;
172 std::string err;
173 do
174 {
175 // now we actually need to run the jobs
176 // everybody will be executing their localworld's jobs
177 int res=localworld->runJobs(err);
178 std::cerr << "Done local jobs" << std::endl;
179 // now we find out about the other worlds
180 if (!checkResultInt(res, mres, globalcom))
181 {
182 throw SplitWorldException("MPI appears to have failed.");
183 }
184 } while (mres==1);
185 if (mres==2)
186 {
187 throw SplitWorldException("At least one Job's work() function did not return True/False.");
188 }
189 else if (mres==3)
190 {
191 char* resultstr=0;
192 // now we ship around the error message - This should be safe since
193 // eveyone must have finished their Jobs to get here
194 if (!esysUtils::shipString(err.c_str(), &resultstr, globalcom->comm))
195 {
196 throw SplitWorldException("MPI appears to have failed.");
197 }
198 //throw SplitWorldException("At least one Job's work() function raised an exception.");
199 std::string s("At least one Job's work() function raised the following exception:\n");
200 s+=resultstr;
201 throw SplitWorldException(s);
202 }
203 }
204
205 void SplitWorld::registerCrate(escript::crate_ptr c)
206 {
207 protocrates.push_back(c);
208 }
209
210 /**
211 stores the constructor/factory to make Jobs and the parameters.
212 */
213 void SplitWorld::addJob(boost::python::object creator, boost::python::tuple tup, boost::python::dict kw)
214 {
215 create.push_back(creator);
216 tupargs.push_back(tup);
217 kwargs.push_back(kw);
218 }
219
220 void SplitWorld::clearPendingJobs()
221 {
222 create.clear();
223 tupargs.clear();
224 kwargs.clear();
225 }
226
227 void SplitWorld::clearActiveJobs()
228 {
229 localworld->clearJobs();
230 }
231
232 // All the job params are known on all the ranks.
233 void SplitWorld::distributeJobs()
234 {
235 unsigned int numjobs=create.size()/swcount;
236 unsigned int start=create.size()/swcount*localid;
237 if (localid<create.size()%swcount)
238 {
239 numjobs++;
240 start+=localid;
241 }
242 else
243 {
244 start+=create.size()%swcount;
245 }
246 int errstat=0;
247 try
248 {
249 std::cerr << "Numjobs=" << numjobs << " start=" << start << std::endl;
250 // No other subworld will be looking at this portion of the array
251 // so jobs will only be created on one subworld
252 for (unsigned int i=start;i<start+numjobs;++i)
253 {
254 // we need to add some things to the kw map
255 kwargs[i]["domain"]=localworld->getDomain();
256 kwargs[i]["jobid"]=object(jobcounter+i);
257 object job=create[i](*(tupargs[i]), **(kwargs[i]));
258 localworld->addJob(job);
259 }
260 }
261 catch (boost::python::error_already_set e)
262 {
263 errstat=1;
264 }
265 jobcounter+=create.size();
266 clearPendingJobs();
267
268 // MPI check to ensure that it worked for everybody
269 int mstat=0;
270 if (!esysUtils::checkResult(errstat, mstat, globalcom->comm))
271 {
272 throw SplitWorldException("MPI appears to have failed.");
273 }
274
275 if (errstat==1)
276 {
277 throw SplitWorldException("distributeJobs: Job creation failed.");
278 clearActiveJobs();
279 }
280 }
281
282
283 namespace escript
284 {
285
286 boost::python::object raw_buildDomains(boost::python::tuple t, boost::python::dict kwargs)
287 {
288 int l=len(t);
289 if (l<2)
290 {
291 throw SplitWorldException("Insufficient parameters to buildDomains.");
292 }
293 extract<SplitWorld&> exw(t[0]);
294 if (!exw.check())
295 {
296 throw SplitWorldException("First parameter to buildDomains must be a SplitWorld.");
297 }
298 SplitWorld& ws=exw();
299 tuple ntup=tuple(t.slice(1,l)); // strip off the object param
300 return ws.buildDomains(ntup, kwargs);
301 }
302
303 boost::python::object raw_addJob(boost::python::tuple t, boost::python::dict kwargs)
304 {
305 int l=len(t);
306 if (l<2)
307 {
308 throw SplitWorldException("Insufficient parameters to addJob.");
309 }
310 extract<SplitWorld&> exw(t[0]);
311 if (!exw.check())
312 {
313 throw SplitWorldException("First parameter to addJob must be a SplitWorld.");
314 }
315 SplitWorld& ws=exw();
316 object creator=t[1];
317 tuple ntup=tuple(t.slice(2,l)); // strip off the object param
318 ws.addJob(creator, ntup, kwargs);
319 return object();
320 }
321
322
323 }

  ViewVC Help
Powered by ViewVC 1.1.26