/[escript]/trunk/escriptcore/src/SubWorld.cpp
ViewVC logotype

Contents of /trunk/escriptcore/src/SubWorld.cpp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 5775 - (show annotations)
Thu Jul 30 08:01:06 2015 UTC (3 years, 10 months ago) by sshaw
File size: 27751 byte(s)
pushing release to trunk
1
2 /*****************************************************************************
3 *
4 * Copyright (c) 2014-2015 by The University of Queensland
5 * http://www.uq.edu.au
6 *
7 * Primary Business: Queensland, Australia
8 * Licensed under the Open Software License version 3.0
9 * http://www.opensource.org/licenses/osl-3.0.php
10 *
11 * Development until 2012 by Earth Systems Science Computational Center (ESSCC)
12 * Development 2012-2013 by School of Earth Sciences
13 * Development from 2014 by Centre for Geoscience Computing (GeoComp)
14 *
15 *****************************************************************************/
16
17 #define ESNEEDPYTHON
18 #include "esysUtils/first.h"
19
20
21 #include "SubWorld.h"
22 #include "SplitWorldException.h"
23 #include "esysUtils/pyerr.h"
24
25 #include "MPIDataReducer.h"
26 #include "MPIScalarReducer.h"
27 #include "NonReducedVariable.h"
28
29 #include <boost/python/import.hpp>
30 #include <boost/python/dict.hpp>
31
32 #include <iostream>
33
34 using namespace escript;
35 namespace bp=boost::python;
36 using namespace esysUtils;
37 namespace rs=escript::reducerstatus;
38
39 using namespace std;
40
41 SubWorld::SubWorld(JMPI& global, JMPI& comm, JMPI& corr, unsigned int subworldcount, unsigned int local_id, bool manualimport)
42 :everyone(global), swmpi(comm), corrmpi(corr), domain((AbstractDomain*)0),
43 swcount(subworldcount), localid(local_id), manualimports(manualimport)
44 #ifdef ESYS_MPI
45 ,globalinfoinvalid(true)
46 #endif
47 {
48 swcount=subworldcount; // redundant to keep clang happy
49 }
50
51 SubWorld::~SubWorld()
52 {
53 }
54
55 JMPI& SubWorld::getMPI()
56 {
57 return swmpi;
58 }
59
60 JMPI& SubWorld::getCorrMPI()
61 {
62 return corrmpi;
63 }
64
65 void SubWorld::setDomain(Domain_ptr d)
66 {
67 domain=d;
68 }
69
70 Domain_ptr SubWorld::getDomain()
71 {
72 return domain;
73 }
74
75 void SubWorld::addJob(boost::python::object j)
76 {
77 jobvec.push_back(j);
78 }
79
80 void SubWorld::clearJobs()
81 {
82 jobvec.clear();
83 }
84
85 void SubWorld::setMyVarState(const std::string& vname, char state)
86 {
87 setVarState(vname, state, localid);
88 }
89
90 void SubWorld::setAllVarsState(const std::string& vname, char state)
91 {
92 #ifdef ESYS_MPI
93 // we need to know where the variable is in the sequence
94 str2char::iterator it=varstate.find(vname);
95 size_t c=0;
96 for (;it!=varstate.end();++it,++c)
97 {
98 if (it->first==vname)
99 {
100 break;
101 }
102 }
103 if (it==varstate.end())
104 {
105 return;
106 }
107 it->second=state;
108 c--; // we now have the sequence position of the variable
109 for (char z=rs::NONE; z<=rs::NEW;++z)
110 {
111 globalvarcounts[vname][z]=0;
112 }
113 globalvarcounts[vname][state]=swcount;
114 if (!globalinfoinvalid) // it will be updated in the next synch
115 {
116 for (size_t p=c;p<globalvarinfo.size();p+=getNumVars())
117 {
118 globalvarinfo[p]=state;
119 }
120 }
121 #else
122 varstate[vname]=state;
123 #endif
124
125
126 }
127
128
129 void SubWorld::setVarState(const std::string& vname, char state, int swid)
130 {
131 #ifdef ESYS_MPI
132 // we need to know where the variable is in thbe sequence
133 str2char::iterator it;
134 size_t c=0;
135 for (it=varstate.begin();it!=varstate.end();++it,++c)
136 {
137 if (it->first==vname)
138 {
139 break;
140 }
141 }
142 if (it==varstate.end())
143 {
144 return;
145 }
146 // we now have the sequence position of the variable
147 if (!globalinfoinvalid) // it will be updated in the next synch
148 {
149 unsigned char ostate=globalvarinfo[c+getNumVars()*swid];
150 globalvarinfo[c+getNumVars()*swid]=state;
151 globalvarcounts[vname][ostate]--;
152 globalvarcounts[vname][state]++;
153 }
154 if (swid==localid) // we are updating our own state so we need to change "varstate"
155 {
156 it->second=state;
157 }
158 #else
159 varstate[vname]=state;
160 #endif
161 }
162
163
164 // this will give the imported values to interested jobs
165 bool SubWorld::deliverImports(std::string& errmsg)
166 {
167 for (size_t i=0;i<jobvec.size();++i)
168 {
169 if (manualimports)
170 {
171 bp::list wanted=bp::extract<bp::list>(jobvec[i].attr("wantedvalues"))();
172 for (size_t j=0;j<len(wanted);++j)
173 {
174 bp::extract<std::string> exs(wanted[j]); // must have been checked by now
175 std::string n=exs();
176 // now we need to check to see if this value is known
177 str2reduce::iterator it=reducemap.find(n);
178 if (it==reducemap.end())
179 {
180 errmsg="Attempt to import variable \""+n+"\". SplitWorld was not told about this variable.";
181 return false;
182 }
183 try
184 {
185 jobvec[i].attr("setImportValue")(it->first, reducemap[it->first]->getPyObj());
186 }
187 catch (boost::python::error_already_set e)
188 {
189 getStringFromPyException(e, errmsg);
190 return false;
191 }
192 }
193 }
194 else
195 {
196 // For automatic imports, we want to import "Everything" into every job.
197 // However, we don't want to import things with no value yet
198 for (str2reduce::iterator it=reducemap.begin();it!=reducemap.end();++it)
199 {
200 if (it->second->hasValue())
201 {
202 try
203 {
204 jobvec[i].attr("setImportValue")(it->first, it->second->getPyObj());
205 }
206 catch (boost::python::error_already_set e)
207 {
208 getStringFromPyException(e, errmsg);
209 return false;
210 }
211 }
212 }
213 }
214 }
215 return true;
216 }
217
218 // Gather exported values from jobs and merge them in the reducer
219 bool SubWorld::localTransport(std::string& errmsg)
220 {
221 for (size_t i=0;i<jobvec.size();++i)
222 {
223 bp::dict expmap=bp::extract<bp::dict>(jobvec[i].attr("exportedvalues"))();
224 bp::list items=expmap.items();
225 size_t l=bp::len(items);
226 for (int j=0;j<l;++j)
227 {
228 bp::object o1=items[j][0];
229 bp::object o2=items[j][1];
230 bp::extract<std::string> ex1(o1);
231 if (!ex1.check())
232 {
233 errmsg="Job attempted export using a name which was not a string.";
234 return false;
235 }
236 std::string name=ex1();
237 std::map<std::string, Reducer_ptr>::iterator it=reducemap.find(name);
238 if (it==reducemap.end())
239 {
240 errmsg="Attempt to export variable \""+name+"\". SplitWorld was not told about this variable.";
241 return false;
242 }
243 // so now we know it is a known name, we check that it is not None and that it is compatible
244 if (o2.is_none())
245 {
246 errmsg="Attempt to export variable \""+name+"\" with value of None, this is not permitted.";
247 return false;
248 }
249 if (!(it->second)->valueCompatible(o2))
250 {
251 errmsg="Attempt to export variable \""+name+"\" with an incompatible value. Using ";
252 errmsg+=(it->second)->description();
253 return false;
254 }
255 if (!(it->second)->reduceLocalValue(o2, errmsg))
256 {
257 return false; // the error string will be set by the reduceLocalValue
258 }
259 setMyVarState(name, rs::NEW);
260 }
261 }
262 return true;
263 }
264
265 void SubWorld::debug()
266 {
267 using namespace std;
268 using namespace escript::reducerstatus;
269 std::cout << "Variables:";
270 #ifdef ESYS_MPI
271 if (!globalinfoinvalid)
272 {
273 cout << "{ NONE INTR OLD OINT NEW }";
274 }
275 else
276 {
277 cout << "(no valid global info)";
278 }
279 #endif
280 std::cout << std::endl;
281 int i=0;
282 for (str2char::iterator it=varstate.begin();it!=varstate.end();++it,++i)
283 {
284 std::cout << it->first << ": ";
285 std::cout << reducemap[it->first]->description() << " ";
286 switch (it->second)
287 {
288 case NONE: cout << "NONE "; break;
289 case INTERESTED: cout << "INTR "; break;
290 case OLDINTERESTED: cout << "OINT "; break;
291 case OLD: cout << "OLD "; break;
292 case NEW: cout << "NEW "; break;
293 }
294 #ifdef ESYS_MPI
295 if (!globalinfoinvalid)
296 {
297 cout << "{ ";
298 for (unsigned char z=rs::NONE;z<=rs::NEW;++z)
299 {
300 cout << globalvarcounts[it->first][z] << ' ';
301 }
302 cout << " } ";
303
304 }
305 else
306 {
307 cout << "(no valid global info)";
308 }
309 #endif
310 cout << endl;
311 }
312
313 #ifdef ESYS_MPI
314
315 if (!globalinfoinvalid)
316 {
317 cout << "[";
318 for (size_t i=0;i<globalvarinfo.size();++i)
319 {
320 if (i%getNumVars()==0)
321 {
322 cout << " ";
323 }
324 cout << (short)globalvarinfo[i];
325 }
326 cout << " ] ";
327
328 }
329
330
331 #endif
332 std::cout << "Debug end\n";
333 std::cout.flush();
334
335 }
336
337
338 // not to be called while running jobs
339 // The tricky bit, is that this could be be called between job runs
340 // this means that the values of variables may not have been synched yet
341 double SubWorld::getScalarVariable(const std::string& name)
342 {
343 str2reduce::iterator it=reducemap.find(name);
344 if (it==reducemap.end())
345 {
346 throw SplitWorldException("No variable of that name.");
347 }
348 // need to indicate we are interested in the variable
349 if (varstate[name]==rs::NONE)
350 {
351 setMyVarState(name, rs::INTERESTED);
352 }
353 else if (varstate[name]==rs::OLD)
354 {
355 setMyVarState(name, rs::OLDINTERESTED);
356 }
357 // anything else, indicates interest anyway
358 #ifdef ESYS_MPI
359 std::string errmsg;
360 if (!synchVariableInfo(errmsg))
361 {
362 throw SplitWorldException(std::string("(Getting scalar --- Variable information) ")+errmsg);
363 }
364 if (!synchVariableValues(errmsg))
365 {
366 throw SplitWorldException(std::string("(Getting scalar --- Variable value) ")+errmsg);
367 }
368 #endif
369 if (dynamic_cast<MPIScalarReducer*>(it->second.get()))
370 {
371 return dynamic_cast<MPIScalarReducer*>(it->second.get())->getDouble();
372 }
373 if (dynamic_cast<NonReducedVariable*>(it->second.get()))
374 {
375 boost::python::extract<double> ex(it->second->getPyObj());
376 if (!ex.check())
377 {
378 throw SplitWorldException("Variable is not scalar.");
379 }
380 return ex();
381 }
382 throw SplitWorldException("Variable is not scalar.");
383 }
384
385 bool SubWorld::checkRemoteCompatibility(std::string& errmsg)
386 {
387 for (str2reduce::iterator it=reducemap.begin();it!=reducemap.end();++it)
388 {
389 if (! it->second->checkRemoteCompatibility(corrmpi, errmsg))
390 {
391 return false;
392 }
393 }
394 return true;
395 }
396
397 #ifdef ESYS_MPI
398
399 bool SubWorld::makeComm(MPI_Comm& sourcecom, JMPI& ncom,std::vector<int>& members)
400 {
401 MPI_Comm subcom;
402 MPI_Group sourceg, g;
403 if (MPI_Comm_group(sourcecom, &sourceg)!=MPI_SUCCESS) {return false;}
404 if (MPI_Group_incl(sourceg, members.size(), &members[0], &g)!=MPI_SUCCESS) {return false;}
405 // then create a communicator with that group
406 if (MPI_Comm_create(sourcecom, g, &subcom)!=MPI_SUCCESS)
407 {
408 return false;
409 }
410 ncom=makeInfo(subcom, true);
411 return true;
412 }
413
414
415 // The mystate, could be computed from vnum, this is just to shortcut
416 // creates two groups, the first contains procs which need to reduce
417 // the second group contains a single process with the new value and
418 // all other interested parties
419 bool SubWorld::makeGroupReduceGroups(MPI_Comm& srccom, int vnum, char mystate, JMPI& red, JMPI& cop, bool& incopy)
420 {
421 incopy=false;
422 if ((mystate==rs::NEW)
423 || (mystate==rs::INTERESTED)
424 || (mystate==rs::OLDINTERESTED))
425 {
426 // first create a group with all the updates in it
427 std::vector<int> redmembers;
428 std::vector<int> copmembers;
429 for (int i=0+vnum;i<globalvarinfo.size();i+=getNumVars())
430 {
431 bool havesrc=false;
432 int world=i/getNumVars();
433 // make a vector of the involved procs with New at the front
434 switch (globalvarinfo[i])
435 {
436 case rs::NEW:
437 if (!havesrc)
438 {
439 copmembers.insert(copmembers.begin(), world);
440 havesrc=true;
441 if (world==localid)
442 {
443 incopy=true;
444 }
445 }
446 redmembers.push_back(world);
447 break;
448 case rs::INTERESTED:
449 case rs::OLDINTERESTED:
450 copmembers.push_back(world);
451 if (world==localid)
452 {
453 incopy=true;
454 }
455 break;
456 }
457 }
458 if (!makeComm(srccom, red, redmembers))
459 {
460 return false;
461 }
462 if (!makeComm(srccom, cop, copmembers))
463 {
464 return false;
465 }
466 return true;
467
468 }
469 else // for people not in involved in the value shipping
470 { // This would be a nice time to use MPI_Comm_create_group
471 // but it does not exist in MPI2.1
472 MPI_Comm temp;
473 if (MPI_Comm_create(srccom, MPI_GROUP_EMPTY, &temp)!=MPI_SUCCESS)
474 {
475 return false;
476 }
477 red=makeInfo(temp, true);
478 if (MPI_Comm_create(srccom, MPI_GROUP_EMPTY, &temp)!=MPI_SUCCESS)
479 {
480 return false;
481 }
482 cop=makeInfo(temp, true);
483 return true;
484 }
485
486 }
487
488
489 // a group with NEW nodes at the front and INT and OLDINT at the back
490 // NONE worlds get an empty communicator
491 bool SubWorld::makeGroupComm1(MPI_Comm& srccom, int vnum, char mystate, JMPI& com)
492 {
493 if ((mystate==rs::NEW)
494 || (mystate==rs::INTERESTED)
495 || (mystate==rs::OLDINTERESTED))
496 {
497 // first create a group with [updates, interested and oldinterested in it]
498 std::vector<int> members;
499 for (int i=0+vnum;i<globalvarinfo.size();i+=getNumVars())
500 {
501 // make a vector of the involved procs with New at the front
502 switch (globalvarinfo[i])
503 {
504 case rs::NEW: members.insert(members.begin(), i/getNumVars()); break;
505 case rs::INTERESTED:
506 case rs::OLDINTERESTED:
507 members.push_back(i/getNumVars());
508 break;
509 }
510 }
511 return makeComm(srccom, com, members);
512 }
513 else // for people not in involved in the value shipping
514 { // This would be a nice time to use MPI_Comm_create_group
515 // but it does not exist in MPI2.1
516 MPI_Comm temp;
517 MPI_Comm_create(srccom, MPI_GROUP_EMPTY, &temp);
518 com=makeInfo(temp, true);
519 return true;
520 }
521 }
522
523 // A group with a single OLD or OLDINT at the front and all the INT worlds
524 // following it
525 bool SubWorld::makeGroupComm2(MPI_Comm& srccom, int vnum, char mystate, JMPI& com, bool& ingroup)
526 {
527 ingroup=false;
528 if ((mystate==rs::OLD)
529 || (mystate==rs::INTERESTED)
530 || (mystate==rs::OLDINTERESTED))
531 {
532 // first create a group with [old, interested and oldinterested in it]
533 std::vector<int> members;
534 bool havesrc=false;
535 for (int i=0+vnum;i<globalvarinfo.size();i+=getNumVars())
536 {
537 int world=i/getNumVars();
538 // make a vector of the involved procs with OLD/OLDINTERESTED at the front
539 switch (globalvarinfo[i])
540 {
541 case rs::NEW: return false; break;
542 case rs::INTERESTED: members.push_back(world);
543 if (world==localid)
544 {
545 ingroup=true;
546 }
547 break;
548 case rs::OLD:
549 case rs::OLDINTERESTED:
550 if (!havesrc)
551 {
552 members.insert(members.begin(), world);
553 havesrc=true;
554 if (world==localid)
555 {
556 ingroup=true;
557 }
558 }
559 break;
560 }
561 }
562 return makeComm(srccom, com, members);
563 }
564 else // for people not in involved in the value shipping
565 { // This would be a nice time to use MPI_Comm_create_group
566 // but it does not exist in MPI2.1
567 MPI_Comm temp;
568 MPI_Comm_create(srccom, MPI_GROUP_EMPTY, &temp);
569 com=makeInfo(temp, true);
570 return true;
571 }
572 }
573
574 #endif
575
576
577 bool SubWorld::synchVariableValues(std::string& err)
578 {
579 #ifdef ESYS_MPI
580 // There are three possibilities here but since all worlds have the same knowledge
581 // we can be sure that they will all make the same choice
582 // 1) No updates are required
583 // 2) There is a single world with a new value so it can broadcast it
584 // 3) There are multiple worlds with updates
585
586 // need to keep track of which vars have updates
587 std::vector<std::string> varswithupdates;
588
589 int vnum=0;
590 for (str2reduce::iterator it=reducemap.begin();it!=reducemap.end();++it, ++vnum)
591 {
592 // check to see if anyone needs it
593 int needcount=0; // who wants a new value
594 int newcount=0; // who has a new version
595 int oldcount=0; // who has an old version
596 int oldintcount=0;
597 newcount=globalvarcounts[it->first][rs::NEW];
598 oldcount=globalvarcounts[it->first][rs::OLD];
599 oldintcount=globalvarcounts[it->first][rs::OLDINTERESTED];
600 needcount=globalvarcounts[it->first][rs::INTERESTED]+oldintcount;
601 if (newcount>0)
602 {
603 varswithupdates.push_back(it->first);
604 }
605 if (needcount+newcount+oldcount==0)
606 {
607 continue; // noone cares about this variable
608 }
609 if (needcount>0 && (oldcount+oldintcount+newcount)==0)
610 {
611 err="Import attempted for a variable \""+(it->first)+"\" with no value.";
612 return false;
613 }
614 // worlds have the variable but noone is interested in it
615 // note that if there are multiple new values, we still want to merge them
616 if ((needcount==0) && (newcount<=1))
617 {
618 continue;
619 }
620 if (swcount==1)
621 { // nobody else to communicate with
622 continue;
623 }
624 // to reach this point, there must be >=1 source and >=1 sink and multiple worlds
625 // first deal updates as source(s)
626 if (newcount==1) // only one update so send from that
627 {
628 JMPI com;
629 if (!makeGroupComm1(corrmpi->comm, vnum, varstate[it->first],com))
630 {
631 err="Error creating group for sharing values,";
632 return false;
633 }
634 if (varstate[it->first]!=rs::NONE && varstate[it->first]!=rs::OLD)
635 {
636 it->second->groupSend(com->comm, (varstate[it->first]==rs::NEW));
637 // Now record the fact that we have the variable now
638 if (varstate[it->first]==rs::INTERESTED)
639 {
640 setMyVarState(it->first, rs::OLDINTERESTED);
641 }
642 }
643 continue;
644 }
645 if (newcount==swcount) // everybody is in on this
646 {
647 if (!it->second->reduceRemoteValues(corrmpi->comm))
648 {
649 it->second->reset();
650 setAllVarsState(it->first, rs::NONE);
651 //setMyVarState(it->first, rs::NONE);
652 err=it->first+"Either MPI failed, or there were multiple simultaneous updates to a variable with the SET operation.";
653 return false;
654 }
655 // Now record the fact that we have the variable now
656 if (varstate[it->first]==rs::INTERESTED)
657 {
658 setMyVarState(it->first, rs::OLDINTERESTED);
659 }
660 continue;
661 }
662 if (newcount>1)
663 {
664 // make groups to reduce and then copy
665 JMPI red;
666 JMPI cop;
667 bool incopy;
668 if (!makeGroupReduceGroups(corrmpi->comm, vnum, varstate[it->first], red, cop, incopy))
669 {
670 err="Error creating groups for sharing values,";
671 return false;
672 }
673 char reduceresult=0;
674 // only new values get reduced
675 if (varstate[it->first]==rs::NEW)
676 {
677 if (!it->second->reduceRemoteValues(red->comm))
678 {
679 char s=1;
680 MPI_Allreduce(&s, &reduceresult, 1, MPI_CHAR, MPI_MAX, corrmpi->comm);
681 reduceresult=1;
682
683 }
684 else
685 {
686 if (it->second->canClash())
687 {
688 char s=0;
689 MPI_Allreduce(&s, &reduceresult, 1, MPI_CHAR, MPI_MAX, corrmpi->comm);
690 }
691 }
692 }
693 else
694 {
695 if (it->second->canClash())
696 {
697 char s=0;
698 MPI_Allreduce(&s, &reduceresult, 1, MPI_CHAR, MPI_MAX, corrmpi->comm);
699 }
700 }
701 // if there was a clash somewhere
702 if (reduceresult!=0)
703 {
704 it->second->reset();
705 setAllVarsState(it->first, rs::NONE);
706 err="Either MPI failed, or there were multiple simultaneous updates to a variable with the SET operation.";
707 return false;
708 }
709
710 // if we are involved in copying the new value around
711 if (incopy)
712 {
713 it->second->groupSend(cop->comm, (varstate[it->first]==rs::NEW));
714 if (varstate[it->first]==rs::INTERESTED)
715 {
716 setMyVarState(it->first, rs::OLDINTERESTED);
717 }
718 }
719 if (varstate[it->first]==rs::NEW)
720 {
721 setMyVarState(it->first, rs::OLDINTERESTED);
722 }
723 continue;
724 }
725 // at this point, we need to ship info around but there are no updates
726 // that is, we are shipping an old copy
727 // picking a source arbitarily (the first one in the array)
728
729 // but first, eliminate the special case where the only interested ones
730 // already have a copy
731 if (oldintcount==needcount)
732 {
733 continue;
734 }
735 JMPI com;
736 bool ingroup=false;
737 if (!makeGroupComm2(corrmpi->comm, vnum, varstate[it->first],com, ingroup))
738 {
739 err="Error creating group for sharing values";
740 return false;
741 }
742 // form group to send to [latestsource and interested]
743
744 if (ingroup) // since only one holder needs to send
745 {
746 bool imsending=(varstate[it->first]==rs::NEW);
747 it->second->groupSend(com->comm, imsending);
748 }
749 }
750 // now we need to age any out of date copies of vars
751 for (size_t i=0;i<varswithupdates.size();++i)
752 {
753 std::string vname=varswithupdates[i];
754 if (varstate[vname]==rs::NEW)
755 {
756 setMyVarState(vname, rs::OLD);
757 }
758 else if (varstate[vname]==rs::OLD)
759 {
760 setMyVarState(vname, rs::NONE);
761 reducemap[vname]->clear();
762 }
763 }
764 #endif
765 return true;
766 }
767
768 bool SubWorld::amLeader()
769 {
770 return swmpi->rank==0;
771 }
772
773 // Find out which variables the local queued jobs are interested in
774 // share that info around
775 bool SubWorld::synchVariableInfo(std::string& err)
776 {
777 if (getNumVars()==0)
778 {
779 return true;
780 }
781 if (manualimports) // manual control over imports
782 {
783 for (size_t i=0;i<jobvec.size();++i)
784 {
785 bp::list wanted=bp::extract<bp::list>(jobvec[i].attr("wantedvalues"))();
786 for (size_t j=0;j<len(wanted);++j)
787 {
788 bp::extract<std::string> exs(wanted[j]);
789 if (!exs.check())
790 {
791 err="names in wantedvalues must be strings";
792 return false;
793 }
794 std::string n=exs();
795 // now we need to check to see if this value is known
796 str2char::iterator it=varstate.find(n);
797 if (it==varstate.end())
798 {
799 err="Attempt to import variable \""+n+"\". SplitWorld was not told about this variable.";
800 return false;
801 }
802 // So at least one job wants this variable
803 switch (it->second)
804 {
805 case rs::NONE: it->second=rs::INTERESTED; break;
806 case rs::INTERESTED: break;
807 case rs::OLD: it->second=rs::OLDINTERESTED; break;
808 case rs::NEW: break;
809 default:
810 err="Unknown variable state";
811 return false;
812 }
813 }
814 }
815 }
816 // Make a vector to hold the info from the map (so we can send it around)
817 std::vector<char> lb(getNumVars(), rs::NONE);
818 size_t i=0;
819 for (str2char::iterator it=varstate.begin();it!=varstate.end();++it,++i)
820 {
821 lb[i]=it->second;
822 }
823
824
825 #ifdef ESYS_MPI
826 // Vector to hold the result
827 globalvarinfo.resize(getNumVars()*swcount, rs::NONE);
828 if (amLeader()) // we only need on representative from each world to send
829 {
830 // The leaders of each world, send their variable information to the proc "0" in
831 // the global world (which will be the leader of subworld "0").
832 // There is an issue here if this operation fails
833 if (MPI_Gather(&lb[0], getNumVars(), MPI_CHAR, &globalvarinfo[0], getNumVars(),
834 MPI_CHAR, 0, getCorrMPI()->comm)!=MPI_SUCCESS)
835 {
836 for (size_t i=0;i<globalvarinfo.size();++i)
837 {
838 globalvarinfo[i]=rs::ERROR;
839 }
840 }
841 }
842 // now share the combined info with all processes
843 if ((MPI_Bcast(&globalvarinfo[0], globalvarinfo.size(), MPI_CHAR, 0, everyone->comm)!=MPI_SUCCESS)
844 || (globalvarinfo[0]==rs::ERROR))
845 {
846 err="Error while gathering variable use information.";
847 return false;
848 }
849 // now we convert that info into a form which is easier to read
850 int p=0;
851 for (str2reduce::iterator it=reducemap.begin();it!=reducemap.end();++it,++p)
852 {
853 globalvarcounts[it->first][rs::NONE]=0;
854 globalvarcounts[it->first][rs::INTERESTED]=0;
855 globalvarcounts[it->first][rs::OLD]=0;
856 globalvarcounts[it->first][rs::OLDINTERESTED]=0;
857 globalvarcounts[it->first][rs::NEW]=0;
858 for (int j=p;j<globalvarinfo.size();j+=getNumVars())
859 {
860 if (globalvarinfo[j]<=rs::NEW)
861 {
862 globalvarcounts[it->first][globalvarinfo[j]]++;
863 }
864 }
865 }
866
867 #endif
868 if (!manualimports)
869 {
870 // import all known variables _BUT_ don't import something if noone has a value
871 // for it
872 int vnum=0;
873 for (str2char::iterator it=varstate.begin();it!=varstate.end();++it, ++vnum)
874 {
875 #ifdef ESYS_MPI
876 // if at least one world has a value for a variable
877 if (globalvarcounts[it->first][rs::OLDINTERESTED]
878 + globalvarcounts[it->first][rs::OLD]
879 + globalvarcounts[it->first][rs::NEW] > 0 )
880 {
881 #endif
882
883 if (it->second==rs::NONE)
884 {
885 it->second=rs::INTERESTED;
886 }
887 else if (it->second==rs::OLD)
888 {
889 it->second=rs::OLDINTERESTED;
890 }
891 #ifdef ESYS_MPI
892 // now we need to update the globalvarinfo to record all the extra interest
893 for (int j=vnum;j<globalvarinfo.size();j+=getNumVars())
894 {
895 if (globalvarinfo[j]==rs::NONE)
896 {
897 globalvarinfo[j]=rs::INTERESTED;
898 globalvarcounts[it->first][rs::NONE]--;
899 globalvarcounts[it->first][rs::INTERESTED]++;
900 }
901 else if (globalvarinfo[j]==rs::OLD)
902 {
903 globalvarinfo[j]=rs::OLDINTERESTED;
904 globalvarcounts[it->first][rs::OLD]--;
905 globalvarcounts[it->first][rs::OLDINTERESTED]++;
906 }
907
908 }
909 }
910 #endif
911 }
912 }
913 #ifdef ESYS_MPI
914 globalinfoinvalid=false;
915 #endif
916
917 return true;
918 }
919
920 // if 4, a Job performed an invalid export
921 // if 3, a Job threw an exception
922 // if 2, a Job did not return a bool
923 // if 1, at least one Job returned False
924 // if 0, all jobs in this world returned True
925 char SubWorld::runJobs(std::string& errormsg)
926 {
927 errormsg.clear();
928 int ret=0;
929 try
930 {
931 for (size_t i=0;i<jobvec.size();++i)
932 {
933 boost::python::object result=jobvec[i].attr("work")();
934 boost::python::extract<bool> ex(result);
935 if (!ex.check() || (result.is_none()))
936 {
937 return 2;
938 }
939 // check to see if we need to keep running
940 if (!ex())
941 {
942 ret=1;
943 }
944
945 }
946 }
947 catch (boost::python::error_already_set e)
948 {
949 getStringFromPyException(e, errormsg);
950 return 3;
951 }
952 return ret;
953 }
954
955 size_t SubWorld::getNumVars()
956 {
957 return reducemap.size();
958 }
959
960 // if manual import is false, add this new variable to all the Jobs in this world
961 void SubWorld::addVariable(std::string& name, Reducer_ptr& rp)
962 {
963 if (reducemap.find(name)!=reducemap.end())
964 {
965 std::ostringstream oss;
966 throw SplitWorldException(oss.str());
967 }
968 if (domain.get()==0)
969 {
970 throw SplitWorldException("No domain has been set yet.");
971 }
972 rp->setDomain(domain);
973 reducemap[name]=rp;
974 varstate[name]=reducerstatus::NONE;
975 if (!manualimports)
976 {
977 for (size_t i=0;i<jobvec.size();++i)
978 {
979 jobvec[i].attr("declareImport")(name);
980 }
981 }
982 #ifdef ESYS_MPI
983 globalinfoinvalid=true; // since we need to regenerate globalvarinfo
984 #endif
985 }
986
987 void SubWorld::removeVariable(std::string& s)
988 {
989 reducemap.erase(s);
990 varstate.erase(s);
991 #ifdef ESYS_MPI
992 globalinfoinvalid=true;
993 globalvarinfo.resize(0);
994 globalvarcounts.erase(s);
995 #endif
996 }
997
998 void SubWorld::clearVariable(std::string& name)
999 {
1000 str2reduce::iterator it=reducemap.find(name);
1001 if (it==reducemap.end())
1002 {
1003 return;
1004 }
1005 it->second->reset();
1006 // if we got here, we must have a valid name so we can change state directly
1007 setAllVarsState(name, rs::NONE);
1008 }
1009
1010 void SubWorld::resetInterest()
1011 {
1012 for (str2char::iterator it=varstate.begin();it!=varstate.end();++it)
1013 {
1014 if (it->second==rs::INTERESTED)
1015 {
1016 it->second=rs::NONE;
1017 }
1018 else if (it->second==rs::OLDINTERESTED)
1019 {
1020 it->second=rs::OLD;
1021 }
1022 }
1023 }
1024
1025 void SubWorld::newRunJobs()
1026 {
1027 for (str2reduce::iterator it=reducemap.begin();it!=reducemap.end();++it)
1028 {
1029 it->second->newRunJobs();
1030 }
1031 }
1032
1033 std::list<std::pair<std::string, bool> > SubWorld::getVarList()
1034 {
1035 std::list<std::pair<std::string,bool> > res;
1036 for (std::map<std::string, Reducer_ptr>::iterator it=reducemap.begin();it!=reducemap.end();++it)
1037 {
1038 res.push_back(std::pair<std::string, bool>(it->first, it->second->hasValue()));
1039 }
1040 return res;
1041 }
1042
1043 void SubWorld::copyVariable(const std::string& src, const std::string& dest)
1044 {
1045 if (reducemap.find(src)==reducemap.end())
1046 {
1047 throw SplitWorldException("Source variable name is not known");
1048 }
1049 if (reducemap.find(dest)==reducemap.end())
1050 {
1051 throw SplitWorldException("Destination variable name is not known");
1052 }
1053 Reducer_ptr sptr=reducemap[src];
1054 Reducer_ptr dptr=reducemap[dest];
1055 dptr->copyValueFrom(sptr);
1056 }
1057
1058
1059
1060
1061

  ViewVC Help
Powered by ViewVC 1.1.26