/[escript]/trunk/escriptcore/src/MPIDataReducer.cpp
ViewVC logotype

Contents of /trunk/escriptcore/src/MPIDataReducer.cpp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 5683 - (show annotations)
Wed Jun 24 02:00:12 2015 UTC (4 years ago) by jfenwick
File size: 13712 byte(s)
Removing debug output.
SUM variables now reset properly.
More forms of reduction supported.

1 /*****************************************************************************
2 *
3 * Copyright (c) 2014-2015 by The University of Queensland
4 * http://www.uq.edu.au
5 *
6 * Primary Business: Queensland, Australia
7 * Licensed under the Open Software License version 3.0
8 * http://www.opensource.org/licenses/osl-3.0.php
9 *
10 * Development until 2012 by Earth Systems Science Computational Center (ESSCC)
11 * Development 2012-2013 by School of Earth Sciences
12 * Development from 2014 by Centre for Geoscience Computing (GeoComp)
13 *
14 *****************************************************************************/
15
16 #define ESNEEDPYTHON
17 #include "esysUtils/first.h"
18
19
20 #include <sstream>
21 #include <limits>
22 #include <boost/python/extract.hpp>
23 #include <boost/scoped_array.hpp>
24
25 #include "MPIDataReducer.h"
26 #include "SplitWorldException.h"
27
28 using namespace boost::python;
29 using namespace escript;
30
31
32 namespace escript
33 {
34 Reducer_ptr makeDataReducer(std::string type)
35 {
36 MPI_Op op;
37 if (type=="SUM")
38 {
39 op=MPI_SUM;
40 }
41 else if (type=="SET")
42 {
43 op=MPI_OP_NULL;
44 }
45 else
46 {
47 throw SplitWorldException("Unsupported operation for makeDataReducer.");
48 }
49 MPIDataReducer* m=new MPIDataReducer(op);
50 return Reducer_ptr(m);
51 }
52
53 }
54
55 namespace
56 {
57
58 void combineData(Data& d1, const Data& d2, MPI_Op op)
59 {
60 if (op==MPI_SUM)
61 {
62 d1+=d2;
63 }
64 else if (op==MPI_OP_NULL)
65 {
66 throw SplitWorldException("Multiple 'simultaneous' attempts to export a 'SET' variable.");
67 }
68 }
69
70 }
71
72 MPIDataReducer::MPIDataReducer(MPI_Op op)
73 : reduceop(op), had_an_export_this_round(false)
74 {
75 valueadded=false;
76 if ((op==MPI_SUM) || (op==MPI_OP_NULL))
77 {
78 // deliberately left blank
79 }
80 else
81 {
82 throw SplitWorldException("Unsupported MPI_Op");
83 }
84 }
85
86 void MPIDataReducer::newRunJobs()
87 {
88 had_an_export_this_round=false;
89 }
90
91 void MPIDataReducer::setDomain(escript::Domain_ptr d)
92 {
93 dom=d;
94 }
95
96 std::string MPIDataReducer::description()
97 {
98 std::string op="SUM";
99 if (reduceop==MPI_OP_NULL)
100 {
101 op="SET";
102 }
103 return "Reducer("+op+") for Data objects";
104 }
105
106 bool MPIDataReducer::valueCompatible(boost::python::object v)
107 {
108 extract<Data&> ex(v);
109 if (!ex.check())
110 {
111 return false;
112 }
113 if (dom.get()!=0)
114 {
115 const Data& d=ex();
116 if (d.getDomain().get()!=dom.get())
117 {
118 return false; // the domains don't match
119 }
120 }
121 return true;
122 }
123
124
125 bool MPIDataReducer::reduceLocalValue(boost::python::object v, std::string& errstring)
126 {
127 extract<Data&> ex(v);
128 if (!ex.check())
129 {
130 errstring="reduceLocalValue: expected Data object. Got something else.";
131 return false;
132 }
133 Data& d=ex();
134 if (d.isEmpty())
135 {
136 errstring="reduceLocalValue: Got an empty Data object. Not allowed to reduce those.";
137 return false;
138 }
139 if ((d.getDomain()!=dom) && (dom.get()!=0))
140 {
141 errstring="reduceLocalValue: Got a Data object, but it was not using the SubWorld's domain.";
142 return false;
143 }
144 d.expand(); // because I don't want to mess about with types of Data
145 if (!valueadded || !had_an_export_this_round) // first value so answer becomes this one
146 {
147 value=d;
148 dom=d.getDomain();
149 had_an_export_this_round=true;
150 valueadded=true;
151 }
152 else
153 {
154 if (reduceop==MPI_OP_NULL)
155 {
156 if (had_an_export_this_round)
157 {
158 errstring="reduceLocalValue: Multiple 'simultaneous' attempts to export a 'SET' variable.";
159 return false;
160 }
161 value=d;
162 dom=d.getDomain();
163 had_an_export_this_round=true;
164 }
165 else
166 {
167 had_an_export_this_round=true;
168 if (d.getFunctionSpace()!=value.getFunctionSpace())
169 {
170 errstring="reduceLocalValue: FunctionSpaces for Data objects being combined must match.";
171 return false;
172 }
173 combineData(value, d, reduceop);
174 }
175 }
176 return true;
177 }
178
179 void MPIDataReducer::reset()
180 {
181 valueadded=false;
182 value=Data();
183 }
184
185 bool MPIDataReducer::checkRemoteCompatibility(esysUtils::JMPI& mpi_info, std::string& errstring)
186 {
187 #ifdef ESYS_MPI
188 // since they can't add it unless it is using the proper domain, we need to check
189
190 std::vector<unsigned> compat(6);
191 getCompatibilityInfo(compat);
192
193 // still need to incorporate domain version into this
194 // or are domains not mutable in any way that matters?
195 int* rbuff=new int[mpi_info->size*compat.size()];
196 boost::scoped_array<int> dummy(rbuff); // to ensure cleanup
197 for (int i=0;i<mpi_info->size;++i)
198 {
199 rbuff[i]=0; // since this won't match any valid value we can use it as a failure check
200 }
201 if (MPI_Allgather(&compat[0], compat.size(), MPI_UNSIGNED, rbuff,
202 compat.size(), MPI_UNSIGNED, mpi_info->comm)!=MPI_SUCCESS)
203 {
204 errstring="MPI failure in checkRemoteCompatibility.";
205 return false;
206 }
207 for (int i=0;i<(mpi_info->size-1);++i)
208 {
209 if ((rbuff[i*compat.size()]==1) || (rbuff[(i+1)*compat.size()]==1)) // one of them doesn't have a value
210 {
211 continue;
212 }
213 for (int j=0;j<compat.size();++j)
214 {
215 if (rbuff[i*compat.size()+j]!=rbuff[(i+1)*compat.size()+j])
216 {
217 std::ostringstream oss;
218 oss << "Incompatible value found for SubWorld " << i+1 << '.';
219 errstring=oss.str();
220 return false;
221 }
222 }
223 }
224 return true;
225 #else
226 return true;
227 #endif
228 }
229
230 // By the time this function is called, we know that all the values
231 // are compatible
232 bool MPIDataReducer::reduceRemoteValues(esysUtils::JMPI& mpi_info, bool active)
233 {
234 if (!active)
235 {
236 return false; // shutting down this option until I implement it
237 }
238 #ifdef ESYS_MPI
239 DataTypes::ValueType& vr=value.getExpandedVectorReference();
240 Data result(0, value.getDataPointShape(), value.getFunctionSpace(), true);
241 DataTypes::ValueType& rr=result.getExpandedVectorReference();
242 if (reduceop==MPI_OP_NULL)
243 {
244 return false; // this will stop bad things happening but won't give an informative error message
245 }
246 if (MPI_Allreduce(&(vr[0]), &(rr[0]), vr.size(), MPI_DOUBLE, reduceop, mpi_info->comm)!=MPI_SUCCESS)
247 {
248 return false;
249 }
250 value=result;
251 return true;
252 #else
253 return true;
254 #endif
255 }
256
257 // populate a vector of ints with enough information to ensure two values are compatible
258 // or to construct a container for incomming data
259 // Format for this:
260 // [0] Type of Data: {0 : error, 1:no value, 10: constant, 11:tagged, 12:expanded}
261 // [1] Functionspace type code
262 // [2] Only used for tagged --- gives the number of tags (which exist in the data object)
263 // [3..6] Components of the shape
264 void MPIDataReducer::getCompatibilityInfo(std::vector<unsigned>& params)
265 {
266 params.resize(7);
267 for (int i=0;i<7;++i)
268 {
269 params[0]=0;
270 }
271 if (!valueadded)
272 {
273 params[0]=1;
274 return;
275 }
276 if (value.isConstant())
277 {
278 params[0]=10;
279 }
280 else if (value.isTagged())
281 {
282 params[0]=11;
283 }
284 else if (value.isExpanded())
285 {
286 params[0]=12;
287 }
288 else // This could be DataEmpty or some other weirdness but we won't allow that
289 {
290 params[0]=0; // invalid type to send
291 return;
292 }
293 params[1]=value.getFunctionSpace().getTypeCode();
294 params[2]=static_cast<unsigned>(value.getNumberOfTaggedValues());
295 const DataTypes::ShapeType& s=value.getDataPointShape();
296 for (int i=0;i<s.size();++i)
297 {
298 params[3+i]=s[i];
299 }
300 }
301
302
303 // Get a value for this variable from another process
304 // This is not a reduction and will replace any existing value
305 bool MPIDataReducer::recvFrom(Esys_MPI_rank localid, Esys_MPI_rank source, esysUtils::JMPI& mpiinfo)
306 {
307 #ifdef ESYS_MPI
308 // first we need to find out what we are expecting
309 unsigned params[7];
310 MPI_Status stat;
311 if (MPI_Recv(params, 7, MPI_UNSIGNED, source, PARAMTAG, mpiinfo->comm, &stat)!=MPI_SUCCESS)
312 {
313 return false;
314 }
315 if (params[0]<10) // the sender somehow tried to send something invalid
316 {
317 return false;
318 }
319 // now we put the shape object together
320 escript::DataTypes::ShapeType s;
321 for (int i=0;i<4;++i)
322 {
323 if (params[3+i]>0)
324 {
325 s.push_back(params[3+i]);
326 }
327 else
328 {
329 break;
330 }
331 }
332 // Now we need the FunctionSpace
333 FunctionSpace fs=FunctionSpace(dom, static_cast<int>(params[1]));
334 value=Data(0, s, fs, params[0]==12);
335 if (params[0]==11) // The Data is tagged so we need to work out what tags we need
336 {
337 // TODO: Need to ship the tags and names over but for now just make sure there
338 // are the same number of tags
339 value.tag();
340
341 DataVector dv(DataTypes::noValues(s), 0, 1);
342 for (unsigned i=0;i<params[2];++i)
343 {
344 value.setTaggedValueFromCPP(static_cast<int>(i)+1, s, dv, 0);
345 }
346 return false; // because I don't trust this yet
347 }
348 #endif
349 return true;
350 }
351
352 // Send a value to this variable to another process
353 // This is not a reduction and will replace any existing value
354 bool MPIDataReducer::sendTo(Esys_MPI_rank localid, Esys_MPI_rank target, esysUtils::JMPI& mpiinfo)
355 {
356 if (!valueadded)
357 {
358 return false; // May be misinterpreted as an MPI failure
359 }
360 #ifdef ESYS_MPI
361 // first step is to let the other world know what sort of thing it needs to make
362 if (value.isLazy())
363 {
364 value.resolve();
365 }
366 std::vector<unsigned> params;
367 getCompatibilityInfo(params);
368 if (MPI_Send(&params[0], 6, MPI_UNSIGNED, target, PARAMTAG, mpiinfo->comm)!=MPI_SUCCESS)
369 {
370 return false;
371 }
372 // now we have informed the other end of what happened
373 // are we done or is there actually data to send
374 if (params[0]<10)
375 {
376 return false;
377 }
378 // at this point, we know there is data to send
379 const DataAbstract::ValueType::value_type* vect=value.getDataRO();
380 // now the receiver knows how much data it should be receive
381 // need to make sure that we aren't trying to send data with no local samples
382 if (vect!=0)
383 {
384 // MPI v3 has this first param as a const void* (as it should be)
385 // Version on my machine expects void*
386 if (MPI_Send(const_cast<DataAbstract::ValueType::value_type*>(vect), value.getLength(), MPI_DOUBLE, target, PARAMTAG, mpiinfo->comm)!=MPI_SUCCESS)
387 {
388 return false;
389 }
390 }
391 #endif
392 return true;
393 }
394
395 boost::python::object MPIDataReducer::getPyObj()
396 {
397 boost::python::object o(value);
398 return o;
399 }
400
401
402 // send from proc 0 in the communicator to all others
403 // second argument is true if this rank is sending
404 bool MPIDataReducer::groupSend(MPI_Comm& comm, bool imsending)
405 {
406 if (dom.get()==0)
407 {
408 return 0; // trying to avoid throwing here
409 // this will still cause a lockup if it happens
410 }
411 #ifdef ESYS_MPI
412 if (imsending)
413 {
414 // first step is to let the other world know what sort of thing it needs to make
415 if (value.isLazy())
416 {
417 value.resolve();
418 }
419 std::vector<unsigned> params;
420 getCompatibilityInfo(params);
421 if (MPI_Bcast(&params[0], params.size(), MPI_UNSIGNED, 0,comm)!=MPI_SUCCESS)
422 {
423 return false;
424 }
425 // now we have informed the other end of what happened
426 // are we done or is there actually data to send
427 if (params[0]<10)
428 {
429 return false;
430 }
431 // at this point, we know there is data to send
432 const DataAbstract::ValueType::value_type* vect=value.getDataRO();
433 // now the receiver knows how much data it should be receive
434 // need to make sure that we aren't trying to send data with no local samples
435 if (vect!=0)
436 {
437 if (MPI_Bcast(const_cast<DataAbstract::ValueType::value_type*>(vect), value.getLength(), MPI_DOUBLE, 0, comm)!=MPI_SUCCESS)
438 {
439 return false;
440 }
441 }
442 }
443 else // we are receiving
444 {
445 // first we need to find out what we are expecting
446 unsigned params[7];
447 if (MPI_Bcast(params, 7, MPI_UNSIGNED, 0, comm)!=MPI_SUCCESS)
448 {
449 return false;
450 }
451 if (params[0]<10) // the sender somehow tried to send something invalid
452 {
453 return false;
454 }
455 // now we put the shape object together
456 escript::DataTypes::ShapeType s;
457 for (int i=0;i<4;++i)
458 {
459 if (params[3+i]>0)
460 {
461 s.push_back(params[3+i]);
462 }
463 else
464 {
465 break;
466 }
467 }
468 // Now we need the FunctionSpace
469 FunctionSpace fs=FunctionSpace(dom, static_cast<int>(params[1]));
470 value=Data(0, s, fs, params[0]==12);
471 if (params[0]==11) // The Data is tagged so we need to work out what tags we need
472 {
473 // TODO: Need to ship the tags and names over but for now just make sure there
474 // are the same number of tags
475 value.tag();
476
477 DataVector dv(DataTypes::noValues(s), 0, 1);
478 for (unsigned i=0;i<params[2];++i)
479 {
480 value.setTaggedValueFromCPP(static_cast<int>(i)+1, s, dv, 0);
481 }
482 return false; // because I don't trust this yet
483 }
484 DataAbstract::ValueType::value_type* vect=&(value.getExpandedVectorReference()[0]);
485 if (MPI_Bcast(const_cast<DataAbstract::ValueType::value_type*>(vect), value.getLength(), MPI_DOUBLE, 0, comm)!=MPI_SUCCESS)
486 {
487 return false;
488 }
489 valueadded=true;
490 }
491 #endif
492 return true;
493 }
494
495 bool MPIDataReducer::groupReduce(MPI_Comm& com, char mystate)
496 {
497 throw SplitWorldException("groupReduce Not implemented yet.");
498 }
499
500 void MPIDataReducer::copyValueFrom(boost::shared_ptr<AbstractReducer>& src)
501 {
502 MPIDataReducer* sr=dynamic_cast<MPIDataReducer*>(src.get());
503 if (sr==0)
504 {
505 throw SplitWorldException("Source and destination need to be the same reducer types.");
506 }
507 if (sr->value.isEmpty())
508 {
509 throw SplitWorldException("Attempt to copy DataEmpty.");
510 }
511 if (sr==this)
512 {
513 throw SplitWorldException("Source and destination can not be the same variable.");
514 }
515 value.copy(sr->value);
516 valueadded=true;
517 }
518

  ViewVC Help
Powered by ViewVC 1.1.26