/[escript]/trunk/ripley/src/Rectangle.cpp
ViewVC logotype

Diff of /trunk/ripley/src/Rectangle.cpp

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

branches/ripleygmg_from_3668/ripley/src/Rectangle.cpp revision 3691 by caltinay, Wed Nov 23 23:07:37 2011 UTC trunk/ripley/src/Rectangle.cpp revision 4705 by jfenwick, Fri Feb 21 02:36:15 2014 UTC
# Line 1  Line 1 
1    
2  /*******************************************************  /*****************************************************************************
3  *  *
4  * Copyright (c) 2003-2011 by University of Queensland  * Copyright (c) 2003-2014 by University of Queensland
5  * Earth Systems Science Computational Center (ESSCC)  * http://www.uq.edu.au
 * http://www.uq.edu.au/esscc  
6  *  *
7  * Primary Business: Queensland, Australia  * Primary Business: Queensland, Australia
8  * Licensed under the Open Software License version 3.0  * Licensed under the Open Software License version 3.0
9  * http://www.opensource.org/licenses/osl-3.0.php  * http://www.opensource.org/licenses/osl-3.0.php
10  *  *
11  *******************************************************/  * Development until 2012 by Earth Systems Science Computational Center (ESSCC)
12    * Development 2012-2013 by School of Earth Sciences
13    * Development from 2014 by Centre for Geoscience Computing (GeoComp)
14    *
15    *****************************************************************************/
16    
17  #include <ripley/Rectangle.h>  #include <ripley/Rectangle.h>
18  extern "C" {  #include <paso/SystemMatrix.h>
19  #include "paso/SystemMatrixPattern.h"  #include <esysUtils/esysFileWriter.h>
20  }  #include <ripley/DefaultAssembler2D.h>
21    #include <ripley/WaveAssembler2D.h>
22    #include <boost/scoped_array.hpp>
23    #include "esysUtils/EsysRandom.h"
24    #include "blocktools.h"
25    
26    #ifdef USE_NETCDF
27    #include <netcdfcpp.h>
28    #endif
29    
30  #if USE_SILO  #if USE_SILO
31  #include <silo.h>  #include <silo.h>
# Line 26  extern "C" { Line 37  extern "C" {
37  #include <iomanip>  #include <iomanip>
38    
39  using namespace std;  using namespace std;
40    using esysUtils::FileWriter;
41    
42  namespace ripley {  namespace ripley {
43    
44  Rectangle::Rectangle(int n0, int n1, double l0, double l1, int d0, int d1) :  Rectangle::Rectangle(int n0, int n1, double x0, double y0, double x1,
45      RipleyDomain(2),                       double y1, int d0, int d1,
46      m_gNE0(n0),                       const std::vector<double>& points,
47      m_gNE1(n1),                       const std::vector<int>& tags,
48      m_l0(l0),                       const simap_t& tagnamestonums) :
49      m_l1(l1),      RipleyDomain(2)
     m_NX(d0),  
     m_NY(d1)  
50  {  {
51        // ignore subdivision parameters for serial run
52        if (m_mpiInfo->size == 1) {
53            d0=1;
54            d1=1;
55        }
56    
57        bool warn=false;
58        // if number of subdivisions is non-positive, try to subdivide by the same
59        // ratio as the number of elements
60        if (d0<=0 && d1<=0) {
61            warn=true;
62            d0=max(1, (int)(sqrt(m_mpiInfo->size*(n0+1)/(float)(n1+1))));
63            d1=m_mpiInfo->size/d0;
64            if (d0*d1 != m_mpiInfo->size) {
65                // ratios not the same so subdivide side with more elements only
66                if (n0>n1) {
67                    d0=0;
68                    d1=1;
69                } else {
70                    d0=1;
71                    d1=0;
72                }
73            }
74        }
75        if (d0<=0) {
76            // d1 is preset, determine d0 - throw further down if result is no good
77            d0=m_mpiInfo->size/d1;
78        } else if (d1<=0) {
79            // d0 is preset, determine d1 - throw further down if result is no good
80            d1=m_mpiInfo->size/d0;
81        }
82    
83      // ensure number of subdivisions is valid and nodes can be distributed      // ensure number of subdivisions is valid and nodes can be distributed
84      // among number of ranks      // among number of ranks
85      if (m_NX*m_NY != m_mpiInfo->size)      if (d0*d1 != m_mpiInfo->size)
86          throw RipleyException("Invalid number of spatial subdivisions");          throw RipleyException("Invalid number of spatial subdivisions");
87    
88      if (n0%m_NX > 0 || n1%m_NY > 0)      if (warn) {
89          throw RipleyException("Number of elements must be separable into number of ranks in each dimension");          cout << "Warning: Automatic domain subdivision (d0=" << d0 << ", d1="
90                << d1 << "). This may not be optimal!" << endl;
91        }
92    
93        double l0 = x1-x0;
94        double l1 = y1-y0;
95        m_dx[0] = l0/n0;
96        m_dx[1] = l1/n1;
97    
98        if ((n0+1)%d0 > 0) {
99            n0=(int)round((float)(n0+1)/d0+0.5)*d0-1;
100            l0=m_dx[0]*n0;
101            cout << "Warning: Adjusted number of elements and length. N0="
102                << n0 << ", l0=" << l0 << endl;
103        }
104        if ((n1+1)%d1 > 0) {
105            n1=(int)round((float)(n1+1)/d1+0.5)*d1-1;
106            l1=m_dx[1]*n1;
107            cout << "Warning: Adjusted number of elements and length. N1="
108                << n1 << ", l1=" << l1 << endl;
109        }
110    
111        if ((d0 > 1 && (n0+1)/d0<2) || (d1 > 1 && (n1+1)/d1<2))
112            throw RipleyException("Too few elements for the number of ranks");
113    
114        m_gNE[0] = n0;
115        m_gNE[1] = n1;
116        m_origin[0] = x0;
117        m_origin[1] = y0;
118        m_length[0] = l0;
119        m_length[1] = l1;
120        m_NX[0] = d0;
121        m_NX[1] = d1;
122    
123        // local number of elements (with and without overlap)
124        m_NE[0] = m_ownNE[0] = (d0>1 ? (n0+1)/d0 : n0);
125        if (m_mpiInfo->rank%d0>0 && m_mpiInfo->rank%d0<d0-1)
126            m_NE[0]++;
127        else if (d0>1 && m_mpiInfo->rank%d0==d0-1)
128            m_ownNE[0]--;
129    
130        m_NE[1] = m_ownNE[1] = (d1>1 ? (n1+1)/d1 : n1);
131        if (m_mpiInfo->rank/d0>0 && m_mpiInfo->rank/d0<d1-1)
132            m_NE[1]++;
133        else if (d1>1 && m_mpiInfo->rank/d0==d1-1)
134            m_ownNE[1]--;
135    
136        // local number of nodes
137        m_NN[0] = m_NE[0]+1;
138        m_NN[1] = m_NE[1]+1;
139    
     // local number of elements  
     m_NE0 = n0/m_NX;  
     m_NE1 = n1/m_NY;  
     // local number of nodes (not necessarily owned)  
     m_N0 = m_NE0+1;  
     m_N1 = m_NE1+1;  
140      // bottom-left node is at (offset0,offset1) in global mesh      // bottom-left node is at (offset0,offset1) in global mesh
141      m_offset0 = m_NE0*(m_mpiInfo->rank%m_NX);      m_offset[0] = (n0+1)/d0*(m_mpiInfo->rank%d0);
142      m_offset1 = m_NE1*(m_mpiInfo->rank/m_NX);      if (m_offset[0] > 0)
143            m_offset[0]--;
144        m_offset[1] = (n1+1)/d1*(m_mpiInfo->rank/d0);
145        if (m_offset[1] > 0)
146            m_offset[1]--;
147    
148      populateSampleIds();      populateSampleIds();
149        createPattern();
150        assembler = new DefaultAssembler2D(this, m_dx, m_NX, m_NE, m_NN);
151        for (map<string, int>::const_iterator i = tagnamestonums.begin();
152                i != tagnamestonums.end(); i++) {
153            setTagMap(i->first, i->second);
154        }
155        addPoints(tags.size(), &points[0], &tags[0]);
156  }  }
157    
158  Rectangle::~Rectangle()  Rectangle::~Rectangle()
159  {  {
160        Paso_SystemMatrixPattern_free(m_pattern);
161        Paso_Connector_free(m_connector);
162        delete assembler;
163  }  }
164    
165  string Rectangle::getDescription() const  string Rectangle::getDescription() const
# Line 69  string Rectangle::getDescription() const Line 169  string Rectangle::getDescription() const
169    
170  bool Rectangle::operator==(const AbstractDomain& other) const  bool Rectangle::operator==(const AbstractDomain& other) const
171  {  {
172      if (dynamic_cast<const Rectangle*>(&other))      const Rectangle* o=dynamic_cast<const Rectangle*>(&other);
173          return this==&other;      if (o) {
174            return (RipleyDomain::operator==(other) &&
175                    m_gNE[0]==o->m_gNE[0] && m_gNE[1]==o->m_gNE[1]
176                    && m_origin[0]==o->m_origin[0] && m_origin[1]==o->m_origin[1]
177                    && m_length[0]==o->m_length[0] && m_length[1]==o->m_length[1]
178                    && m_NX[0]==o->m_NX[0] && m_NX[1]==o->m_NX[1]);
179        }
180    
181      return false;      return false;
182  }  }
183    
184    void Rectangle::readNcGrid(escript::Data& out, string filename, string varname,
185                const ReaderParameters& params) const
186    {
187    #ifdef USE_NETCDF
188        // check destination function space
189        int myN0, myN1;
190        if (out.getFunctionSpace().getTypeCode() == Nodes) {
191            myN0 = m_NN[0];
192            myN1 = m_NN[1];
193        } else if (out.getFunctionSpace().getTypeCode() == Elements ||
194                    out.getFunctionSpace().getTypeCode() == ReducedElements) {
195            myN0 = m_NE[0];
196            myN1 = m_NE[1];
197        } else
198            throw RipleyException("readNcGrid(): invalid function space for output data object");
199    
200        if (params.first.size() != 2)
201            throw RipleyException("readNcGrid(): argument 'first' must have 2 entries");
202    
203        if (params.numValues.size() != 2)
204            throw RipleyException("readNcGrid(): argument 'numValues' must have 2 entries");
205    
206        if (params.multiplier.size() != 2)
207            throw RipleyException("readNcGrid(): argument 'multiplier' must have 2 entries");
208        for (size_t i=0; i<params.multiplier.size(); i++)
209            if (params.multiplier[i]<1)
210                throw RipleyException("readNcGrid(): all multipliers must be positive");
211        if (params.reverse.size() != 2)
212            throw RipleyException("readNcGrid(): argument 'reverse' must have 2 entries");
213    
214        // check file existence and size
215        NcFile f(filename.c_str(), NcFile::ReadOnly);
216        if (!f.is_valid())
217            throw RipleyException("readNcGrid(): cannot open file");
218    
219        NcVar* var = f.get_var(varname.c_str());
220        if (!var)
221            throw RipleyException("readNcGrid(): invalid variable");
222    
223        // TODO: rank>0 data support
224        const int numComp = out.getDataPointSize();
225        if (numComp > 1)
226            throw RipleyException("readNcGrid(): only scalar data supported");
227    
228        const int dims = var->num_dims();
229        boost::scoped_array<long> edges(var->edges());
230    
231        // is this a slice of the data object (dims!=2)?
232        // note the expected ordering of edges (as in numpy: y,x)
233        if ( (dims==2 && (params.numValues[1] > edges[0] || params.numValues[0] > edges[1]))
234                || (dims==1 && params.numValues[1]>1) ) {
235            throw RipleyException("readNcGrid(): not enough data in file");
236        }
237    
238        // check if this rank contributes anything
239        if (params.first[0] >= m_offset[0]+myN0 ||
240                params.first[0]+params.numValues[0]*params.multiplier[0] <= m_offset[0] ||
241                params.first[1] >= m_offset[1]+myN1 ||
242                params.first[1]+params.numValues[1]*params.multiplier[1] <= m_offset[1])
243            return;
244    
245        // now determine how much this rank has to write
246    
247        // first coordinates in data object to write to
248        const int first0 = max(0, params.first[0]-m_offset[0]);
249        const int first1 = max(0, params.first[1]-m_offset[1]);
250        // indices to first value in file (not accounting for reverse yet)
251        int idx0 = max(0, m_offset[0]-params.first[0]);
252        int idx1 = max(0, m_offset[1]-params.first[1]);
253        // number of values to read
254        const int num0 = min(params.numValues[0]-idx0, myN0-first0);
255        const int num1 = min(params.numValues[1]-idx1, myN1-first1);
256    
257        // make sure we read the right block if going backwards through file
258        if (params.reverse[0])
259            idx0 = edges[dims-1]-num0-idx0;
260        if (dims>1 && params.reverse[1])
261            idx1 = edges[dims-2]-num1-idx1;
262    
263        vector<double> values(num0*num1);
264        if (dims==2) {
265            var->set_cur(idx1, idx0);
266            var->get(&values[0], num1, num0);
267        } else {
268            var->set_cur(idx0);
269            var->get(&values[0], num0);
270        }
271    
272        const int dpp = out.getNumDataPointsPerSample();
273        out.requireWrite();
274    
275        // helpers for reversing
276        const int x0 = (params.reverse[0] ? num0-1 : 0);
277        const int x_mult = (params.reverse[0] ? -1 : 1);
278        const int y0 = (params.reverse[1] ? num1-1 : 0);
279        const int y_mult = (params.reverse[1] ? -1 : 1);
280    
281        for (index_t y=0; y<num1; y++) {
282    #pragma omp parallel for
283            for (index_t x=0; x<num0; x++) {
284                const int baseIndex = first0+x*params.multiplier[0]
285                                      +(first1+y*params.multiplier[1])*myN0;
286                const int srcIndex = (y0+y_mult*y)*num0+(x0+x_mult*x);
287                if (!isnan(values[srcIndex])) {
288                    for (index_t m1=0; m1<params.multiplier[1]; m1++) {
289                        for (index_t m0=0; m0<params.multiplier[0]; m0++) {
290                            const int dataIndex = baseIndex+m0+m1*myN0;
291                            double* dest = out.getSampleDataRW(dataIndex);
292                            for (index_t q=0; q<dpp; q++) {
293                                *dest++ = values[srcIndex];
294                            }
295                        }
296                    }
297                }
298            }
299        }
300    #else
301        throw RipleyException("readNcGrid(): not compiled with netCDF support");
302    #endif
303    }
304    
305    void Rectangle::readBinaryGrid(escript::Data& out, string filename,
306                                   const ReaderParameters& params) const
307    {
308        // the mapping is not universally correct but should work on our
309        // supported platforms
310        switch (params.dataType) {
311            case DATATYPE_INT32:
312                readBinaryGridImpl<int>(out, filename, params);
313                break;
314            case DATATYPE_FLOAT32:
315                readBinaryGridImpl<float>(out, filename, params);
316                break;
317            case DATATYPE_FLOAT64:
318                readBinaryGridImpl<double>(out, filename, params);
319                break;
320            default:
321                throw RipleyException("readBinaryGrid(): invalid or unsupported datatype");
322        }
323    }
324    
325    template<typename ValueType>
326    void Rectangle::readBinaryGridImpl(escript::Data& out, const string& filename,
327                                       const ReaderParameters& params) const
328    {
329        // check destination function space
330        int myN0, myN1;
331        if (out.getFunctionSpace().getTypeCode() == Nodes) {
332            myN0 = m_NN[0];
333            myN1 = m_NN[1];
334        } else if (out.getFunctionSpace().getTypeCode() == Elements ||
335                    out.getFunctionSpace().getTypeCode() == ReducedElements) {
336            myN0 = m_NE[0];
337            myN1 = m_NE[1];
338        } else
339            throw RipleyException("readBinaryGrid(): invalid function space for output data object");
340    
341        // check file existence and size
342        ifstream f(filename.c_str(), ifstream::binary);
343        if (f.fail()) {
344            throw RipleyException("readBinaryGrid(): cannot open file");
345        }
346        f.seekg(0, ios::end);
347        const int numComp = out.getDataPointSize();
348        const int filesize = f.tellg();
349        const int reqsize = params.numValues[0]*params.numValues[1]*numComp*sizeof(ValueType);
350        if (filesize < reqsize) {
351            f.close();
352            throw RipleyException("readBinaryGrid(): not enough data in file");
353        }
354    
355        // check if this rank contributes anything
356        if (params.first[0] >= m_offset[0]+myN0 ||
357                params.first[0]+params.numValues[0] <= m_offset[0] ||
358                params.first[1] >= m_offset[1]+myN1 ||
359                params.first[1]+params.numValues[1] <= m_offset[1]) {
360            f.close();
361            return;
362        }
363    
364        // now determine how much this rank has to write
365    
366        // first coordinates in data object to write to
367        const int first0 = max(0, params.first[0]-m_offset[0]);
368        const int first1 = max(0, params.first[1]-m_offset[1]);
369        // indices to first value in file
370        const int idx0 = max(0, m_offset[0]-params.first[0]);
371        const int idx1 = max(0, m_offset[1]-params.first[1]);
372        // number of values to read
373        const int num0 = min(params.numValues[0]-idx0, myN0-first0);
374        const int num1 = min(params.numValues[1]-idx1, myN1-first1);
375    
376        out.requireWrite();
377        vector<ValueType> values(num0*numComp);
378        const int dpp = out.getNumDataPointsPerSample();
379    
380        for (int y=0; y<num1; y++) {
381            const int fileofs = numComp*(idx0+(idx1+y)*params.numValues[0]);
382            f.seekg(fileofs*sizeof(ValueType));
383            f.read((char*)&values[0], num0*numComp*sizeof(ValueType));
384            for (int x=0; x<num0; x++) {
385                const int baseIndex = first0+x*params.multiplier[0]
386                                        +(first1+y*params.multiplier[1])*myN0;
387                for (int m1=0; m1<params.multiplier[1]; m1++) {
388                    for (int m0=0; m0<params.multiplier[0]; m0++) {
389                        const int dataIndex = baseIndex+m0+m1*myN0;
390                        double* dest = out.getSampleDataRW(dataIndex);
391                        for (int c=0; c<numComp; c++) {
392                            ValueType val = values[x*numComp+c];
393    
394                            if (params.byteOrder != BYTEORDER_NATIVE) {
395                                char* cval = reinterpret_cast<char*>(&val);
396                                // this will alter val!!
397                                byte_swap32(cval);
398                            }
399                            if (!std::isnan(val)) {
400                                for (int q=0; q<dpp; q++) {
401                                    *dest++ = static_cast<double>(val);
402                                }
403                            }
404                        }
405                    }
406                }
407            }
408        }
409    
410        f.close();
411    }
412    
413    void Rectangle::writeBinaryGrid(const escript::Data& in, string filename,
414                                    int byteOrder, int dataType) const
415    {
416        // the mapping is not universally correct but should work on our
417        // supported platforms
418        switch (dataType) {
419            case DATATYPE_INT32:
420                writeBinaryGridImpl<int>(in, filename, byteOrder);
421                break;
422            case DATATYPE_FLOAT32:
423                writeBinaryGridImpl<float>(in, filename, byteOrder);
424                break;
425            case DATATYPE_FLOAT64:
426                writeBinaryGridImpl<double>(in, filename, byteOrder);
427                break;
428            default:
429                throw RipleyException("writeBinaryGrid(): invalid or unsupported datatype");
430        }
431    }
432    
433    template<typename ValueType>
434    void Rectangle::writeBinaryGridImpl(const escript::Data& in,
435                                        const string& filename, int byteOrder) const
436    {
437        // check function space and determine number of points
438        int myN0, myN1;
439        int totalN0, totalN1;
440        if (in.getFunctionSpace().getTypeCode() == Nodes) {
441            myN0 = m_NN[0];
442            myN1 = m_NN[1];
443            totalN0 = m_gNE[0]+1;
444            totalN1 = m_gNE[1]+1;
445        } else if (in.getFunctionSpace().getTypeCode() == Elements ||
446                    in.getFunctionSpace().getTypeCode() == ReducedElements) {
447            myN0 = m_NE[0];
448            myN1 = m_NE[1];
449            totalN0 = m_gNE[0];
450            totalN1 = m_gNE[1];
451        } else
452            throw RipleyException("writeBinaryGrid(): invalid function space of data object");
453    
454        const int numComp = in.getDataPointSize();
455        const int dpp = in.getNumDataPointsPerSample();
456    
457        if (numComp > 1 || dpp > 1)
458            throw RipleyException("writeBinaryGrid(): only scalar, single-value data supported");
459    
460        const int fileSize = sizeof(ValueType)*numComp*dpp*totalN0*totalN1;
461    
462        // from here on we know that each sample consists of one value
463        FileWriter fw;
464        fw.openFile(filename, fileSize);
465        MPIBarrier();
466    
467        for (index_t y=0; y<myN1; y++) {
468            const int fileofs = (m_offset[0]+(m_offset[1]+y)*totalN0)*sizeof(ValueType);
469            ostringstream oss;
470    
471            for (index_t x=0; x<myN0; x++) {
472                const double* sample = in.getSampleDataRO(y*myN0+x);
473                ValueType fvalue = static_cast<ValueType>(*sample);
474                if (byteOrder == BYTEORDER_NATIVE) {
475                    oss.write((char*)&fvalue, sizeof(fvalue));
476                } else {
477                    char* value = reinterpret_cast<char*>(&fvalue);
478                    oss.write(byte_swap32(value), sizeof(fvalue));
479                }
480            }
481            fw.writeAt(oss, fileofs);
482        }
483        fw.close();
484    }
485    
486  void Rectangle::dump(const string& fileName) const  void Rectangle::dump(const string& fileName) const
487  {  {
488  #if USE_SILO  #if USE_SILO
# Line 83  void Rectangle::dump(const string& fileN Line 491  void Rectangle::dump(const string& fileN
491          fn+=".silo";          fn+=".silo";
492      }      }
493    
     const int NUM_SILO_FILES = 1;  
     const char* blockDirFmt = "/block%04d";  
494      int driver=DB_HDF5;          int driver=DB_HDF5;    
     string siloPath;  
495      DBfile* dbfile = NULL;      DBfile* dbfile = NULL;
496        const char* blockDirFmt = "/block%04d";
497    
498  #ifdef ESYS_MPI  #ifdef ESYS_MPI
499      PMPIO_baton_t* baton = NULL;      PMPIO_baton_t* baton = NULL;
500        const int NUM_SILO_FILES = 1;
501  #endif  #endif
502    
503      if (m_mpiInfo->size > 1) {      if (m_mpiInfo->size > 1) {
# Line 106  void Rectangle::dump(const string& fileN Line 513  void Rectangle::dump(const string& fileN
513                          PMPIO_DefaultClose, (void*)&driver);                          PMPIO_DefaultClose, (void*)&driver);
514          }          }
515          if (baton) {          if (baton) {
516              char str[64];              char siloPath[64];
517              snprintf(str, 64, blockDirFmt, PMPIO_RankInGroup(baton, m_mpiInfo->rank));              snprintf(siloPath, 64, blockDirFmt, PMPIO_RankInGroup(baton, m_mpiInfo->rank));
518              siloPath = str;              dbfile = (DBfile*) PMPIO_WaitForBaton(baton, fn.c_str(), siloPath);
             dbfile = (DBfile*) PMPIO_WaitForBaton(baton, fn.c_str(), siloPath.c_str());  
519          }          }
520  #endif  #endif
521      } else {      } else {
# Line 121  void Rectangle::dump(const string& fileN Line 527  void Rectangle::dump(const string& fileN
527              dbfile = DBCreate(fn.c_str(), DB_CLOBBER, DB_LOCAL,              dbfile = DBCreate(fn.c_str(), DB_CLOBBER, DB_LOCAL,
528                      getDescription().c_str(), driver);                      getDescription().c_str(), driver);
529          }          }
530            char siloPath[64];
531            snprintf(siloPath, 64, blockDirFmt, 0);
532            DBMkDir(dbfile, siloPath);
533            DBSetDir(dbfile, siloPath);
534      }      }
535    
536      if (!dbfile)      if (!dbfile)
# Line 135  void Rectangle::dump(const string& fileN Line 545  void Rectangle::dump(const string& fileN
545      }      }
546      */      */
547    
548      boost::scoped_ptr<double> x(new double[m_N0]);      boost::scoped_ptr<double> x(new double[m_NN[0]]);
549      boost::scoped_ptr<double> y(new double[m_N1]);      boost::scoped_ptr<double> y(new double[m_NN[1]]);
550      double* coords[2] = { x.get(), y.get() };      double* coords[2] = { x.get(), y.get() };
551  #pragma omp parallel  #pragma omp parallel
552      {      {
553  #pragma omp for  #pragma omp for nowait
554          for (dim_t i0 = 0; i0 < m_N0; i0++) {          for (dim_t i0 = 0; i0 < m_NN[0]; i0++) {
555              coords[0][i0]=(m_l0*(i0+m_offset0))/m_gNE0;              coords[0][i0]=getLocalCoordinate(i0, 0);
556          }          }
557  #pragma omp for  #pragma omp for nowait
558          for (dim_t i1 = 0; i1 < m_N1; i1++) {          for (dim_t i1 = 0; i1 < m_NN[1]; i1++) {
559              coords[1][i1]=(m_l1*(i1+m_offset1))/m_gNE1;              coords[1][i1]=getLocalCoordinate(i1, 1);
560          }          }
561      }      }
562      int dims[] = { m_N0, m_N1 };      int* dims = const_cast<int*>(getNumNodesPerDim());
563    
564        // write mesh
565      DBPutQuadmesh(dbfile, "mesh", NULL, coords, dims, 2, DB_DOUBLE,      DBPutQuadmesh(dbfile, "mesh", NULL, coords, dims, 2, DB_DOUBLE,
566              DB_COLLINEAR, NULL);              DB_COLLINEAR, NULL);
567    
568        // write node ids
569        DBPutQuadvar1(dbfile, "nodeId", "mesh", (void*)&m_nodeId[0], dims, 2,
570                NULL, 0, DB_INT, DB_NODECENT, NULL);
571    
572        // write element ids
573        dims = const_cast<int*>(getNumElementsPerDim());
574        DBPutQuadvar1(dbfile, "elementId", "mesh", (void*)&m_elementId[0],
575                dims, 2, NULL, 0, DB_INT, DB_ZONECENT, NULL);
576    
577        // rank 0 writes multimesh and multivar
578      if (m_mpiInfo->rank == 0) {      if (m_mpiInfo->rank == 0) {
579          vector<string> tempstrings;          vector<string> tempstrings;
580          vector<char*> meshnames;          vector<char*> names;
581          for (dim_t i=0; i<m_mpiInfo->size; i++) {          for (dim_t i=0; i<m_mpiInfo->size; i++) {
582              stringstream path;              stringstream path;
583              path << "/block" << setw(4) << setfill('0') << right << i << "/mesh";              path << "/block" << setw(4) << setfill('0') << right << i << "/mesh";
584              tempstrings.push_back(path.str());              tempstrings.push_back(path.str());
585              meshnames.push_back((char*)tempstrings.back().c_str());              names.push_back((char*)tempstrings.back().c_str());
586          }          }
587          vector<int> meshtypes(m_mpiInfo->size, DB_QUAD_RECT);          vector<int> types(m_mpiInfo->size, DB_QUAD_RECT);
588          DBSetDir(dbfile, "/");          DBSetDir(dbfile, "/");
589          DBPutMultimesh(dbfile, "multimesh", m_mpiInfo->size, &meshnames[0],          DBPutMultimesh(dbfile, "multimesh", m_mpiInfo->size, &names[0],
590                 &meshtypes[0], NULL);                 &types[0], NULL);
591            tempstrings.clear();
592            names.clear();
593            for (dim_t i=0; i<m_mpiInfo->size; i++) {
594                stringstream path;
595                path << "/block" << setw(4) << setfill('0') << right << i << "/nodeId";
596                tempstrings.push_back(path.str());
597                names.push_back((char*)tempstrings.back().c_str());
598            }
599            types.assign(m_mpiInfo->size, DB_QUADVAR);
600            DBPutMultivar(dbfile, "nodeId", m_mpiInfo->size, &names[0],
601                   &types[0], NULL);
602            tempstrings.clear();
603            names.clear();
604            for (dim_t i=0; i<m_mpiInfo->size; i++) {
605                stringstream path;
606                path << "/block" << setw(4) << setfill('0') << right << i << "/elementId";
607                tempstrings.push_back(path.str());
608                names.push_back((char*)tempstrings.back().c_str());
609            }
610            DBPutMultivar(dbfile, "elementId", m_mpiInfo->size, &names[0],
611                   &types[0], NULL);
612      }      }
613    
614      if (m_mpiInfo->size > 1) {      if (m_mpiInfo->size > 1) {
# Line 178  void Rectangle::dump(const string& fileN Line 621  void Rectangle::dump(const string& fileN
621      }      }
622    
623  #else // USE_SILO  #else // USE_SILO
624      throw RipleyException("dump(): no Silo support");      throw RipleyException("dump: no Silo support");
625  #endif  #endif
626  }  }
627    
628  const int* Rectangle::borrowSampleReferenceIDs(int functionSpaceType) const  const int* Rectangle::borrowSampleReferenceIDs(int fsType) const
629  {  {
630      switch (functionSpaceType) {      switch (fsType) {
631          case Nodes:          case Nodes:
632            case ReducedNodes: // FIXME: reduced
633              return &m_nodeId[0];              return &m_nodeId[0];
634            case DegreesOfFreedom:
635            case ReducedDegreesOfFreedom: // FIXME: reduced
636                return &m_dofId[0];
637          case Elements:          case Elements:
638            case ReducedElements:
639              return &m_elementId[0];              return &m_elementId[0];
640          case FaceElements:          case FaceElements:
641            case ReducedFaceElements:
642              return &m_faceId[0];              return &m_faceId[0];
643            case Points:
644                return &m_diracPointNodeIDs[0];
645          default:          default:
646              break;              break;
647      }      }
648    
649      stringstream msg;      stringstream msg;
650      msg << "borrowSampleReferenceIDs() not implemented for function space type "      msg << "borrowSampleReferenceIDs: invalid function space type " << fsType;
         << functionSpaceType;  
651      throw RipleyException(msg.str());      throw RipleyException(msg.str());
652  }  }
653    
654  bool Rectangle::ownSample(int fsCode, index_t id) const  bool Rectangle::ownSample(int fsType, index_t id) const
655  {  {
656  #ifdef ESYS_MPI      if (getMPISize()==1)
657      if (fsCode == Nodes) {          return true;
658          const index_t myFirst=getNumNodes()*m_mpiInfo->rank;  
659          const index_t myLast=getNumNodes()*(m_mpiInfo->rank+1)-1;      switch (fsType) {
660          return (m_nodeId[id]>=myFirst && m_nodeId[id]<=myLast);          case Nodes:
661      } else          case ReducedNodes: // FIXME: reduced
662          throw RipleyException("ownSample() only implemented for Nodes");              return (m_dofMap[id] < getNumDOF());
663  #else          case DegreesOfFreedom:
664      return true;          case ReducedDegreesOfFreedom:
665  #endif              return true;
666            case Elements:
667            case ReducedElements:
668                // check ownership of element's bottom left node
669                return (m_dofMap[id%m_NE[0]+m_NN[0]*(id/m_NE[0])] < getNumDOF());
670            case FaceElements:
671            case ReducedFaceElements:
672                {
673                    // determine which face the sample belongs to before
674                    // checking ownership of corresponding element's first node
675                    dim_t n=0;
676                    for (size_t i=0; i<4; i++) {
677                        n+=m_faceCount[i];
678                        if (id<n) {
679                            index_t k;
680                            if (i==1)
681                                k=m_NN[0]-2;
682                            else if (i==3)
683                                k=m_NN[0]*(m_NN[1]-2);
684                            else
685                                k=0;
686                            // determine whether to move right or up
687                            const index_t delta=(i/2==0 ? m_NN[0] : 1);
688                            return (m_dofMap[k+(id-n+m_faceCount[i])*delta] < getNumDOF());
689                        }
690                    }
691                    return false;
692                }
693            default:
694                break;
695        }
696    
697        stringstream msg;
698        msg << "ownSample: invalid function space type " << fsType;
699        throw RipleyException(msg.str());
700  }  }
701    
702  void Rectangle::interpolateOnDomain(escript::Data& target,  void Rectangle::setToNormal(escript::Data& out) const
                                     const escript::Data& in) const  
703  {  {
704      const Rectangle& inDomain=dynamic_cast<const Rectangle&>(*(in.getFunctionSpace().getDomain()));      if (out.getFunctionSpace().getTypeCode() == FaceElements) {
705      const Rectangle& targetDomain=dynamic_cast<const Rectangle&>(*(target.getFunctionSpace().getDomain()));          out.requireWrite();
706      if (inDomain != *this)  #pragma omp parallel
707          throw RipleyException("Illegal domain of interpolant");          {
708      if (targetDomain != *this)              if (m_faceOffset[0] > -1) {
709          throw RipleyException("Illegal domain of interpolation target");  #pragma omp for nowait
710                    for (index_t k1 = 0; k1 < m_NE[1]; ++k1) {
711      throw RipleyException("interpolateOnDomain() not implemented");                      double* o = out.getSampleDataRW(m_faceOffset[0]+k1);
712  }                      // set vector at two quadrature points
713                        *o++ = -1.;
714  Paso_SystemMatrixPattern* Rectangle::getPattern(bool reducedRowOrder,                      *o++ = 0.;
715                                                  bool reducedColOrder) const                      *o++ = -1.;
716  {                      *o = 0.;
717      if (reducedRowOrder || reducedColOrder)                  }
718          throw RipleyException("getPattern() not implemented for reduced order");              }
719    
720  /*              if (m_faceOffset[1] > -1) {
721      // create distribution  #pragma omp for nowait
722      IndexVector dist;                  for (index_t k1 = 0; k1 < m_NE[1]; ++k1) {
723      for (index_t i=0; i<m_mpiInfo->size+1; i++)                      double* o = out.getSampleDataRW(m_faceOffset[1]+k1);
724          dist.push_back(i*getNumNodes());                      // set vector at two quadrature points
725      Paso_Distribution* distribution = Paso_Distribution_alloc(m_mpiInfo,                      *o++ = 1.;
726              &dist[0], 1, 0);                      *o++ = 0.;
727                        *o++ = 1.;
728                        *o = 0.;
729                    }
730                }
731    
732                if (m_faceOffset[2] > -1) {
733    #pragma omp for nowait
734                    for (index_t k0 = 0; k0 < m_NE[0]; ++k0) {
735                        double* o = out.getSampleDataRW(m_faceOffset[2]+k0);
736                        // set vector at two quadrature points
737                        *o++ = 0.;
738                        *o++ = -1.;
739                        *o++ = 0.;
740                        *o = -1.;
741                    }
742                }
743    
744                if (m_faceOffset[3] > -1) {
745    #pragma omp for nowait
746                    for (index_t k0 = 0; k0 < m_NE[0]; ++k0) {
747                        double* o = out.getSampleDataRW(m_faceOffset[3]+k0);
748                        // set vector at two quadrature points
749                        *o++ = 0.;
750                        *o++ = 1.;
751                        *o++ = 0.;
752                        *o = 1.;
753                    }
754                }
755            } // end of parallel section
756        } else if (out.getFunctionSpace().getTypeCode() == ReducedFaceElements) {
757            out.requireWrite();
758    #pragma omp parallel
759            {
760                if (m_faceOffset[0] > -1) {
761    #pragma omp for nowait
762                    for (index_t k1 = 0; k1 < m_NE[1]; ++k1) {
763                        double* o = out.getSampleDataRW(m_faceOffset[0]+k1);
764                        *o++ = -1.;
765                        *o = 0.;
766                    }
767                }
768    
769                if (m_faceOffset[1] > -1) {
770    #pragma omp for nowait
771                    for (index_t k1 = 0; k1 < m_NE[1]; ++k1) {
772                        double* o = out.getSampleDataRW(m_faceOffset[1]+k1);
773                        *o++ = 1.;
774                        *o = 0.;
775                    }
776                }
777    
778                if (m_faceOffset[2] > -1) {
779    #pragma omp for nowait
780                    for (index_t k0 = 0; k0 < m_NE[0]; ++k0) {
781                        double* o = out.getSampleDataRW(m_faceOffset[2]+k0);
782                        *o++ = 0.;
783                        *o = -1.;
784                    }
785                }
786    
787                if (m_faceOffset[3] > -1) {
788    #pragma omp for nowait
789                    for (index_t k0 = 0; k0 < m_NE[0]; ++k0) {
790                        double* o = out.getSampleDataRW(m_faceOffset[3]+k0);
791                        *o++ = 0.;
792                        *o = 1.;
793                    }
794                }
795            } // end of parallel section
796    
797      // connectors      } else {
798      dim_t numNeighbours = ...;          stringstream msg;
799      RankVector neighbour(numNeighbours);          msg << "setToNormal: invalid function space type "
800      IndexVector offsetInShared(numNeighbours+1);              << out.getFunctionSpace().getTypeCode();
801      IndexVector shared(offsetInShared[numNeighbours]);          throw RipleyException(msg.str());
802        }
803    }
804    
805      Paso_SharedComponents *snd_shcomp = Paso_SharedComponents_alloc(  void Rectangle::setToSize(escript::Data& out) const
806              getNumNodes(), numNeighbours, neighbour, shared, offsetInShared,  {
807              1, 0, m_mpiInfo);      if (out.getFunctionSpace().getTypeCode() == Elements
808      Paso_SharedComponents *rcv_shcomp = Paso_SharedComponents_alloc(              || out.getFunctionSpace().getTypeCode() == ReducedElements) {
809              getNumNodes(), numNeighbours, neighbour, shared, offsetInShared,          out.requireWrite();
810              1, 0, m_mpiInfo);          const dim_t numQuad=out.getNumDataPointsPerSample();
811      Paso_Connector* connector = Paso_Connector_alloc(snd_shcomp, rcv_shcomp);          const double size=sqrt(m_dx[0]*m_dx[0]+m_dx[1]*m_dx[1]);
812    #pragma omp parallel for
813      // create patterns          for (index_t k = 0; k < getNumElements(); ++k) {
814      Paso_Pattern *mainPattern = Paso_Pattern_alloc(MATRIX_FORMAT_DEFAULT,              double* o = out.getSampleDataRW(k);
815              M, N, ptr, index);              fill(o, o+numQuad, size);
816      Paso_Pattern *colCouplePattern = Paso_Pattern_alloc(MATRIX_FORMAT_DEFAULT,          }
817              M, N, ...);      } else if (out.getFunctionSpace().getTypeCode() == FaceElements
818      Paso_Pattern *rowCouplePattern = Paso_Pattern_alloc(MATRIX_FORMAT_DEFAULT,              || out.getFunctionSpace().getTypeCode() == ReducedFaceElements) {
819              ...);          out.requireWrite();
820            const dim_t numQuad=out.getNumDataPointsPerSample();
821      Paso_SystemMatrixPattern* pattern = Paso_SystemMatrixPattern_alloc(  #pragma omp parallel
822              MATRIX_FORMAT_DEFAULT, distribution, distribution,          {
823              mainPattern, colCouplePattern, rowCouplePattern,              if (m_faceOffset[0] > -1) {
824              connector, connector);  #pragma omp for nowait
825      Paso_Pattern_free(mainPattern);                  for (index_t k1 = 0; k1 < m_NE[1]; ++k1) {
826      Paso_Pattern_free(colCouplePattern);                      double* o = out.getSampleDataRW(m_faceOffset[0]+k1);
827      Paso_Pattern_free(rowCouplePattern);                      fill(o, o+numQuad, m_dx[1]);
828      Paso_Distribution_free(distribution);                  }
829      Paso_SharedComponents_free(snd_shcomp);              }
830      Paso_SharedComponents_free(rcv_shcomp);  
831  */              if (m_faceOffset[1] > -1) {
832      throw RipleyException("getPattern() not implemented");  #pragma omp for nowait
833                    for (index_t k1 = 0; k1 < m_NE[1]; ++k1) {
834                        double* o = out.getSampleDataRW(m_faceOffset[1]+k1);
835                        fill(o, o+numQuad, m_dx[1]);
836                    }
837                }
838    
839                if (m_faceOffset[2] > -1) {
840    #pragma omp for nowait
841                    for (index_t k0 = 0; k0 < m_NE[0]; ++k0) {
842                        double* o = out.getSampleDataRW(m_faceOffset[2]+k0);
843                        fill(o, o+numQuad, m_dx[0]);
844                    }
845                }
846    
847                if (m_faceOffset[3] > -1) {
848    #pragma omp for nowait
849                    for (index_t k0 = 0; k0 < m_NE[0]; ++k0) {
850                        double* o = out.getSampleDataRW(m_faceOffset[3]+k0);
851                        fill(o, o+numQuad, m_dx[0]);
852                    }
853                }
854            } // end of parallel section
855    
856        } else {
857            stringstream msg;
858            msg << "setToSize: invalid function space type "
859                << out.getFunctionSpace().getTypeCode();
860            throw RipleyException(msg.str());
861        }
862  }  }
863    
864  void Rectangle::Print_Mesh_Info(const bool full) const  void Rectangle::Print_Mesh_Info(const bool full) const
# Line 287  void Rectangle::Print_Mesh_Info(const bo Line 870  void Rectangle::Print_Mesh_Info(const bo
870          cout.setf(ios::scientific, ios::floatfield);          cout.setf(ios::scientific, ios::floatfield);
871          for (index_t i=0; i < getNumNodes(); i++) {          for (index_t i=0; i < getNumNodes(); i++) {
872              cout << "  " << setw(5) << m_nodeId[i]              cout << "  " << setw(5) << m_nodeId[i]
873                  << "  " << (m_l0*(i%m_N0+m_offset0))/m_gNE0                  << "  " << getLocalCoordinate(i%m_NN[0], 0)
874                  << "  " << (m_l1*(i/m_N0+m_offset1))/m_gNE1 << endl;                  << "  " << getLocalCoordinate(i/m_NN[0], 1) << endl;
875          }          }
876      }      }
877  }  }
878    
 //protected  
 dim_t Rectangle::getNumFaceElements() const  
 {  
     dim_t n=0;  
     //left  
     if (m_offset0==0)  
         n+=m_NE1;  
     //right  
     if (m_mpiInfo->rank%m_NX==m_NX-1)  
         n+=m_NE1;  
     //bottom  
     if (m_offset1==0)  
         n+=m_NE0;  
     //top  
     if (m_mpiInfo->rank/m_NX==m_NY-1)  
         n+=m_NE0;  
   
     return n;  
 }  
879    
880  //protected  //protected
881  void Rectangle::assembleCoordinates(escript::Data& arg) const  void Rectangle::assembleCoordinates(escript::Data& arg) const
# Line 325  void Rectangle::assembleCoordinates(escr Line 889  void Rectangle::assembleCoordinates(escr
889    
890      arg.requireWrite();      arg.requireWrite();
891  #pragma omp parallel for  #pragma omp parallel for
892      for (dim_t i1 = 0; i1 < m_N1; i1++) {      for (dim_t i1 = 0; i1 < m_NN[1]; i1++) {
893          for (dim_t i0 = 0; i0 < m_N0; i0++) {          for (dim_t i0 = 0; i0 < m_NN[0]; i0++) {
894              double* point = arg.getSampleDataRW(i0+m_N0*i1);              double* point = arg.getSampleDataRW(i0+m_NN[0]*i1);
895              point[0] = (m_l0*(i0+m_offset0))/m_gNE0;              point[0] = getLocalCoordinate(i0, 0);
896              point[1] = (m_l1*(i1+m_offset1))/m_gNE1;              point[1] = getLocalCoordinate(i1, 1);
897            }
898        }
899    }
900    
901    //protected
902    void Rectangle::assembleGradient(escript::Data& out, const escript::Data& in) const
903    {
904        const dim_t numComp = in.getDataPointSize();
905        const double cx0 = .21132486540518711775/m_dx[0];
906        const double cx1 = .78867513459481288225/m_dx[0];
907        const double cx2 = 1./m_dx[0];
908        const double cy0 = .21132486540518711775/m_dx[1];
909        const double cy1 = .78867513459481288225/m_dx[1];
910        const double cy2 = 1./m_dx[1];
911    
912        if (out.getFunctionSpace().getTypeCode() == Elements) {
913            out.requireWrite();
914    #pragma omp parallel
915            {
916                vector<double> f_00(numComp);
917                vector<double> f_01(numComp);
918                vector<double> f_10(numComp);
919                vector<double> f_11(numComp);
920    #pragma omp for
921                for (index_t k1=0; k1 < m_NE[1]; ++k1) {
922                    for (index_t k0=0; k0 < m_NE[0]; ++k0) {
923                        memcpy(&f_00[0], in.getSampleDataRO(INDEX2(k0,k1, m_NN[0])), numComp*sizeof(double));
924                        memcpy(&f_01[0], in.getSampleDataRO(INDEX2(k0,k1+1, m_NN[0])), numComp*sizeof(double));
925                        memcpy(&f_10[0], in.getSampleDataRO(INDEX2(k0+1,k1, m_NN[0])), numComp*sizeof(double));
926                        memcpy(&f_11[0], in.getSampleDataRO(INDEX2(k0+1,k1+1, m_NN[0])), numComp*sizeof(double));
927                        double* o = out.getSampleDataRW(INDEX2(k0,k1,m_NE[0]));
928                        for (index_t i=0; i < numComp; ++i) {
929                            o[INDEX3(i,0,0,numComp,2)] = (f_10[i]-f_00[i])*cx1 + (f_11[i]-f_01[i])*cx0;
930                            o[INDEX3(i,1,0,numComp,2)] = (f_01[i]-f_00[i])*cy1 + (f_11[i]-f_10[i])*cy0;
931                            o[INDEX3(i,0,1,numComp,2)] = (f_10[i]-f_00[i])*cx1 + (f_11[i]-f_01[i])*cx0;
932                            o[INDEX3(i,1,1,numComp,2)] = (f_01[i]-f_00[i])*cy0 + (f_11[i]-f_10[i])*cy1;
933                            o[INDEX3(i,0,2,numComp,2)] = (f_10[i]-f_00[i])*cx0 + (f_11[i]-f_01[i])*cx1;
934                            o[INDEX3(i,1,2,numComp,2)] = (f_01[i]-f_00[i])*cy1 + (f_11[i]-f_10[i])*cy0;
935                            o[INDEX3(i,0,3,numComp,2)] = (f_10[i]-f_00[i])*cx0 + (f_11[i]-f_01[i])*cx1;
936                            o[INDEX3(i,1,3,numComp,2)] = (f_01[i]-f_00[i])*cy0 + (f_11[i]-f_10[i])*cy1;
937                        } // end of component loop i
938                    } // end of k0 loop
939                } // end of k1 loop
940            } // end of parallel section
941        } else if (out.getFunctionSpace().getTypeCode() == ReducedElements) {
942            out.requireWrite();
943    #pragma omp parallel
944            {
945                vector<double> f_00(numComp);
946                vector<double> f_01(numComp);
947                vector<double> f_10(numComp);
948                vector<double> f_11(numComp);
949    #pragma omp for
950                for (index_t k1=0; k1 < m_NE[1]; ++k1) {
951                    for (index_t k0=0; k0 < m_NE[0]; ++k0) {
952                        memcpy(&f_00[0], in.getSampleDataRO(INDEX2(k0,k1, m_NN[0])), numComp*sizeof(double));
953                        memcpy(&f_01[0], in.getSampleDataRO(INDEX2(k0,k1+1, m_NN[0])), numComp*sizeof(double));
954                        memcpy(&f_10[0], in.getSampleDataRO(INDEX2(k0+1,k1, m_NN[0])), numComp*sizeof(double));
955                        memcpy(&f_11[0], in.getSampleDataRO(INDEX2(k0+1,k1+1, m_NN[0])), numComp*sizeof(double));
956                        double* o = out.getSampleDataRW(INDEX2(k0,k1,m_NE[0]));
957                        for (index_t i=0; i < numComp; ++i) {
958                            o[INDEX3(i,0,0,numComp,2)] = (f_10[i] + f_11[i] - f_00[i] - f_01[i])*cx2/2;
959                            o[INDEX3(i,1,0,numComp,2)] = (f_01[i] + f_11[i] - f_00[i] - f_10[i])*cy2/2;
960                        } // end of component loop i
961                    } // end of k0 loop
962                } // end of k1 loop
963            } // end of parallel section
964        } else if (out.getFunctionSpace().getTypeCode() == FaceElements) {
965            out.requireWrite();
966    #pragma omp parallel
967            {
968                vector<double> f_00(numComp);
969                vector<double> f_01(numComp);
970                vector<double> f_10(numComp);
971                vector<double> f_11(numComp);
972                if (m_faceOffset[0] > -1) {
973    #pragma omp for nowait
974                    for (index_t k1=0; k1 < m_NE[1]; ++k1) {
975                        memcpy(&f_00[0], in.getSampleDataRO(INDEX2(0,k1, m_NN[0])), numComp*sizeof(double));
976                        memcpy(&f_01[0], in.getSampleDataRO(INDEX2(0,k1+1, m_NN[0])), numComp*sizeof(double));
977                        memcpy(&f_10[0], in.getSampleDataRO(INDEX2(1,k1, m_NN[0])), numComp*sizeof(double));
978                        memcpy(&f_11[0], in.getSampleDataRO(INDEX2(1,k1+1, m_NN[0])), numComp*sizeof(double));
979                        double* o = out.getSampleDataRW(m_faceOffset[0]+k1);
980                        for (index_t i=0; i < numComp; ++i) {
981                            o[INDEX3(i,0,0,numComp,2)] = (f_10[i]-f_00[i])*cx1 + (f_11[i]-f_01[i])*cx0;
982                            o[INDEX3(i,1,0,numComp,2)] = (f_01[i]-f_00[i])*cy2;
983                            o[INDEX3(i,0,1,numComp,2)] = (f_10[i]-f_00[i])*cx0 + (f_11[i]-f_01[i])*cx1;
984                            o[INDEX3(i,1,1,numComp,2)] = (f_01[i]-f_00[i])*cy2;
985                        } // end of component loop i
986                    } // end of k1 loop
987                } // end of face 0
988                if (m_faceOffset[1] > -1) {
989    #pragma omp for nowait
990                    for (index_t k1=0; k1 < m_NE[1]; ++k1) {
991                        memcpy(&f_00[0], in.getSampleDataRO(INDEX2(m_NN[0]-2,k1, m_NN[0])), numComp*sizeof(double));
992                        memcpy(&f_01[0], in.getSampleDataRO(INDEX2(m_NN[0]-2,k1+1, m_NN[0])), numComp*sizeof(double));
993                        memcpy(&f_10[0], in.getSampleDataRO(INDEX2(m_NN[0]-1,k1, m_NN[0])), numComp*sizeof(double));
994                        memcpy(&f_11[0], in.getSampleDataRO(INDEX2(m_NN[0]-1,k1+1, m_NN[0])), numComp*sizeof(double));
995                        double* o = out.getSampleDataRW(m_faceOffset[1]+k1);
996                        for (index_t i=0; i < numComp; ++i) {
997                            o[INDEX3(i,0,0,numComp,2)] = (f_10[i]-f_00[i])*cx1 + (f_11[i]-f_01[i])*cx0;
998                            o[INDEX3(i,1,0,numComp,2)] = (f_11[i]-f_10[i])*cy2;
999                            o[INDEX3(i,0,1,numComp,2)] = (f_10[i]-f_00[i])*cx0 + (f_11[i]-f_01[i])*cx1;
1000                            o[INDEX3(i,1,1,numComp,2)] = (f_11[i]-f_10[i])*cy2;
1001                        } // end of component loop i
1002                    } // end of k1 loop
1003                } // end of face 1
1004                if (m_faceOffset[2] > -1) {
1005    #pragma omp for nowait
1006                    for (index_t k0=0; k0 < m_NE[0]; ++k0) {
1007                        memcpy(&f_00[0], in.getSampleDataRO(INDEX2(k0,0, m_NN[0])), numComp*sizeof(double));
1008                        memcpy(&f_01[0], in.getSampleDataRO(INDEX2(k0,1, m_NN[0])), numComp*sizeof(double));
1009                        memcpy(&f_10[0], in.getSampleDataRO(INDEX2(k0+1,0, m_NN[0])), numComp*sizeof(double));
1010                        memcpy(&f_11[0], in.getSampleDataRO(INDEX2(k0+1,1, m_NN[0])), numComp*sizeof(double));
1011                        double* o = out.getSampleDataRW(m_faceOffset[2]+k0);
1012                        for (index_t i=0; i < numComp; ++i) {
1013                            o[INDEX3(i,0,0,numComp,2)] = (f_10[i]-f_00[i])*cx2;
1014                            o[INDEX3(i,1,0,numComp,2)] = (f_01[i]-f_00[i])*cy1 + (f_11[i]-f_10[i])*cy0;
1015                            o[INDEX3(i,0,1,numComp,2)] = (f_10[i]-f_00[i])*cx2;
1016                            o[INDEX3(i,1,1,numComp,2)] = (f_01[i]-f_00[i])*cy0 + (f_11[i]-f_10[i])*cy1;
1017                        } // end of component loop i
1018                    } // end of k0 loop
1019                } // end of face 2
1020                if (m_faceOffset[3] > -1) {
1021    #pragma omp for nowait
1022                    for (index_t k0=0; k0 < m_NE[0]; ++k0) {
1023                        memcpy(&f_00[0], in.getSampleDataRO(INDEX2(k0,m_NN[1]-2, m_NN[0])), numComp*sizeof(double));
1024                        memcpy(&f_01[0], in.getSampleDataRO(INDEX2(k0,m_NN[1]-1, m_NN[0])), numComp*sizeof(double));
1025                        memcpy(&f_10[0], in.getSampleDataRO(INDEX2(k0+1,m_NN[1]-2, m_NN[0])), numComp*sizeof(double));
1026                        memcpy(&f_11[0], in.getSampleDataRO(INDEX2(k0+1,m_NN[1]-1, m_NN[0])), numComp*sizeof(double));
1027                        double* o = out.getSampleDataRW(m_faceOffset[3]+k0);
1028                        for (index_t i=0; i < numComp; ++i) {
1029                            o[INDEX3(i,0,0,numComp,2)] = (f_11[i]-f_01[i])*cx2;
1030                            o[INDEX3(i,1,0,numComp,2)] = (f_01[i]-f_00[i])*cy1 + (f_11[i]-f_10[i])*cy0;
1031                            o[INDEX3(i,0,1,numComp,2)] = (f_11[i]-f_01[i])*cx2;
1032                            o[INDEX3(i,1,1,numComp,2)] = (f_01[i]-f_00[i])*cy0 + (f_11[i]-f_10[i])*cy1;
1033                        } // end of component loop i
1034                    } // end of k0 loop
1035                } // end of face 3
1036            } // end of parallel section
1037    
1038        } else if (out.getFunctionSpace().getTypeCode() == ReducedFaceElements) {
1039            out.requireWrite();
1040    #pragma omp parallel
1041            {
1042                vector<double> f_00(numComp);
1043                vector<double> f_01(numComp);
1044                vector<double> f_10(numComp);
1045                vector<double> f_11(numComp);
1046                if (m_faceOffset[0] > -1) {
1047    #pragma omp for nowait
1048                    for (index_t k1=0; k1 < m_NE[1]; ++k1) {
1049                        memcpy(&f_00[0], in.getSampleDataRO(INDEX2(0,k1, m_NN[0])), numComp*sizeof(double));
1050                        memcpy(&f_01[0], in.getSampleDataRO(INDEX2(0,k1+1, m_NN[0])), numComp*sizeof(double));
1051                        memcpy(&f_10[0], in.getSampleDataRO(INDEX2(1,k1, m_NN[0])), numComp*sizeof(double));
1052                        memcpy(&f_11[0], in.getSampleDataRO(INDEX2(1,k1+1, m_NN[0])), numComp*sizeof(double));
1053                        double* o = out.getSampleDataRW(m_faceOffset[0]+k1);
1054                        for (index_t i=0; i < numComp; ++i) {
1055                            o[INDEX3(i,0,0,numComp,2)] = (f_10[i] + f_11[i] - f_00[i] - f_01[i])*cx2/2;
1056                            o[INDEX3(i,1,0,numComp,2)] = (f_01[i]-f_00[i])*cy2;
1057                        } // end of component loop i
1058                    } // end of k1 loop
1059                } // end of face 0
1060                if (m_faceOffset[1] > -1) {
1061    #pragma omp for nowait
1062                    for (index_t k1=0; k1 < m_NE[1]; ++k1) {
1063                        memcpy(&f_00[0], in.getSampleDataRO(INDEX2(m_NN[0]-2,k1, m_NN[0])), numComp*sizeof(double));
1064                        memcpy(&f_01[0], in.getSampleDataRO(INDEX2(m_NN[0]-2,k1+1, m_NN[0])), numComp*sizeof(double));
1065                        memcpy(&f_10[0], in.getSampleDataRO(INDEX2(m_NN[0]-1,k1, m_NN[0])), numComp*sizeof(double));
1066                        memcpy(&f_11[0], in.getSampleDataRO(INDEX2(m_NN[0]-1,k1+1, m_NN[0])), numComp*sizeof(double));
1067                        double* o = out.getSampleDataRW(m_faceOffset[1]+k1);
1068                        for (index_t i=0; i < numComp; ++i) {
1069                            o[INDEX3(i,0,0,numComp,2)] = (f_10[i] + f_11[i] - f_00[i] - f_01[i])*cx2/2;
1070                            o[INDEX3(i,1,0,numComp,2)] = (f_11[i]-f_10[i])*cy2;
1071                        } // end of component loop i
1072                    } // end of k1 loop
1073                } // end of face 1
1074                if (m_faceOffset[2] > -1) {
1075    #pragma omp for nowait
1076                    for (index_t k0=0; k0 < m_NE[0]; ++k0) {
1077                        memcpy(&f_00[0], in.getSampleDataRO(INDEX2(k0,0, m_NN[0])), numComp*sizeof(double));
1078                        memcpy(&f_01[0], in.getSampleDataRO(INDEX2(k0,1, m_NN[0])), numComp*sizeof(double));
1079                        memcpy(&f_10[0], in.getSampleDataRO(INDEX2(k0+1,0, m_NN[0])), numComp*sizeof(double));
1080                        memcpy(&f_11[0], in.getSampleDataRO(INDEX2(k0+1,1, m_NN[0])), numComp*sizeof(double));
1081                        double* o = out.getSampleDataRW(m_faceOffset[2]+k0);
1082                        for (index_t i=0; i < numComp; ++i) {
1083                            o[INDEX3(i,0,0,numComp,2)] = (f_10[i]-f_00[i])*cx2;
1084                            o[INDEX3(i,1,0,numComp,2)] = (f_01[i] + f_11[i] - f_00[i] - f_10[i])*cy2/2;
1085                        } // end of component loop i
1086                    } // end of k0 loop
1087                } // end of face 2
1088                if (m_faceOffset[3] > -1) {
1089    #pragma omp for nowait
1090                    for (index_t k0=0; k0 < m_NE[0]; ++k0) {
1091                        memcpy(&f_00[0], in.getSampleDataRO(INDEX2(k0,m_NN[1]-2, m_NN[0])), numComp*sizeof(double));
1092                        memcpy(&f_01[0], in.getSampleDataRO(INDEX2(k0,m_NN[1]-1, m_NN[0])), numComp*sizeof(double));
1093                        memcpy(&f_10[0], in.getSampleDataRO(INDEX2(k0+1,m_NN[1]-2, m_NN[0])), numComp*sizeof(double));
1094                        memcpy(&f_11[0], in.getSampleDataRO(INDEX2(k0+1,m_NN[1]-1, m_NN[0])), numComp*sizeof(double));
1095                        double* o = out.getSampleDataRW(m_faceOffset[3]+k0);
1096                        for (index_t i=0; i < numComp; ++i) {
1097                            o[INDEX3(i,0,0,numComp,2)] = (f_11[i]-f_01[i])*cx2;
1098                            o[INDEX3(i,1,0,numComp,2)] = (f_01[i] + f_11[i] - f_00[i] - f_10[i])*cy2/2;
1099                        } // end of component loop i
1100                    } // end of k0 loop
1101                } // end of face 3
1102            } // end of parallel section
1103        }
1104    }
1105    
1106    //protected
1107    void Rectangle::assembleIntegrate(vector<double>& integrals,
1108                                      const escript::Data& arg) const
1109    {
1110        const dim_t numComp = arg.getDataPointSize();
1111        const index_t left = (m_offset[0]==0 ? 0 : 1);
1112        const index_t bottom = (m_offset[1]==0 ? 0 : 1);
1113        const int fs=arg.getFunctionSpace().getTypeCode();
1114        if (fs == Elements && arg.actsExpanded()) {
1115    #pragma omp parallel
1116            {
1117                vector<double> int_local(numComp, 0);
1118                const double w = m_dx[0]*m_dx[1]/4.;
1119    #pragma omp for nowait
1120                for (index_t k1 = bottom; k1 < bottom+m_ownNE[1]; ++k1) {
1121                    for (index_t k0 = left; k0 < left+m_ownNE[0]; ++k0) {
1122                        const double* f = arg.getSampleDataRO(INDEX2(k0, k1, m_NE[0]));
1123                        for (index_t i=0; i < numComp; ++i) {
1124                            const double f0 = f[INDEX2(i,0,numComp)];
1125                            const double f1 = f[INDEX2(i,1,numComp)];
1126                            const double f2 = f[INDEX2(i,2,numComp)];
1127                            const double f3 = f[INDEX2(i,3,numComp)];
1128                            int_local[i]+=(f0+f1+f2+f3)*w;
1129                        }  // end of component loop i
1130                    } // end of k0 loop
1131                } // end of k1 loop
1132    #pragma omp critical
1133                for (index_t i=0; i<numComp; i++)
1134                    integrals[i]+=int_local[i];
1135            } // end of parallel section
1136    
1137        } else if (fs==ReducedElements || (fs==Elements && !arg.actsExpanded())) {
1138            const double w = m_dx[0]*m_dx[1];
1139    #pragma omp parallel
1140            {
1141                vector<double> int_local(numComp, 0);
1142    #pragma omp for nowait
1143                for (index_t k1 = bottom; k1 < bottom+m_ownNE[1]; ++k1) {
1144                    for (index_t k0 = left; k0 < left+m_ownNE[0]; ++k0) {
1145                        const double* f = arg.getSampleDataRO(INDEX2(k0, k1, m_NE[0]));
1146                        for (index_t i=0; i < numComp; ++i) {
1147                            int_local[i]+=f[i]*w;
1148                        }
1149                    }
1150                }
1151    #pragma omp critical
1152                for (index_t i=0; i<numComp; i++)
1153                    integrals[i]+=int_local[i];
1154            } // end of parallel section
1155    
1156        } else if (fs == FaceElements && arg.actsExpanded()) {
1157    #pragma omp parallel
1158            {
1159                vector<double> int_local(numComp, 0);
1160                const double w0 = m_dx[0]/2.;
1161                const double w1 = m_dx[1]/2.;
1162                if (m_faceOffset[0] > -1) {
1163    #pragma omp for nowait
1164                    for (index_t k1 = bottom; k1 < bottom+m_ownNE[1]; ++k1) {
1165                        const double* f = arg.getSampleDataRO(m_faceOffset[0]+k1);
1166                        for (index_t i=0; i < numComp; ++i) {
1167                            const double f0 = f[INDEX2(i,0,numComp)];
1168                            const double f1 = f[INDEX2(i,1,numComp)];
1169                            int_local[i]+=(f0+f1)*w1;
1170                        }  // end of component loop i
1171                    } // end of k1 loop
1172                }
1173    
1174                if (m_faceOffset[1] > -1) {
1175    #pragma omp for nowait
1176                    for (index_t k1 = bottom; k1 < bottom+m_ownNE[1]; ++k1) {
1177                        const double* f = arg.getSampleDataRO(m_faceOffset[1]+k1);
1178                        for (index_t i=0; i < numComp; ++i) {
1179                            const double f0 = f[INDEX2(i,0,numComp)];
1180                            const double f1 = f[INDEX2(i,1,numComp)];
1181                            int_local[i]+=(f0+f1)*w1;
1182                        }  // end of component loop i
1183                    } // end of k1 loop
1184                }
1185    
1186                if (m_faceOffset[2] > -1) {
1187    #pragma omp for nowait
1188                    for (index_t k0 = left; k0 < left+m_ownNE[0]; ++k0) {
1189                        const double* f = arg.getSampleDataRO(m_faceOffset[2]+k0);
1190                        for (index_t i=0; i < numComp; ++i) {
1191                            const double f0 = f[INDEX2(i,0,numComp)];
1192                            const double f1 = f[INDEX2(i,1,numComp)];
1193                            int_local[i]+=(f0+f1)*w0;
1194                        }  // end of component loop i
1195                    } // end of k0 loop
1196                }
1197    
1198                if (m_faceOffset[3] > -1) {
1199    #pragma omp for nowait
1200                    for (index_t k0 = left; k0 < left+m_ownNE[0]; ++k0) {
1201                        const double* f = arg.getSampleDataRO(m_faceOffset[3]+k0);
1202                        for (index_t i=0; i < numComp; ++i) {
1203                            const double f0 = f[INDEX2(i,0,numComp)];
1204                            const double f1 = f[INDEX2(i,1,numComp)];
1205                            int_local[i]+=(f0+f1)*w0;
1206                        }  // end of component loop i
1207                    } // end of k0 loop
1208                }
1209    #pragma omp critical
1210                for (index_t i=0; i<numComp; i++)
1211                    integrals[i]+=int_local[i];
1212            } // end of parallel section
1213    
1214        } else if (fs==ReducedFaceElements || (fs==FaceElements && !arg.actsExpanded())) {
1215    #pragma omp parallel
1216            {
1217                vector<double> int_local(numComp, 0);
1218                if (m_faceOffset[0] > -1) {
1219    #pragma omp for nowait
1220                    for (index_t k1 = bottom; k1 < bottom+m_ownNE[1]; ++k1) {
1221                        const double* f = arg.getSampleDataRO(m_faceOffset[0]+k1);
1222                        for (index_t i=0; i < numComp; ++i) {
1223                            int_local[i]+=f[i]*m_dx[1];
1224                        }
1225                    }
1226                }
1227    
1228                if (m_faceOffset[1] > -1) {
1229    #pragma omp for nowait
1230                    for (index_t k1 = bottom; k1 < bottom+m_ownNE[1]; ++k1) {
1231                        const double* f = arg.getSampleDataRO(m_faceOffset[1]+k1);
1232                        for (index_t i=0; i < numComp; ++i) {
1233                            int_local[i]+=f[i]*m_dx[1];
1234                        }
1235                    }
1236                }
1237    
1238                if (m_faceOffset[2] > -1) {
1239    #pragma omp for nowait
1240                    for (index_t k0 = left; k0 < left+m_ownNE[0]; ++k0) {
1241                        const double* f = arg.getSampleDataRO(m_faceOffset[2]+k0);
1242                        for (index_t i=0; i < numComp; ++i) {
1243                            int_local[i]+=f[i]*m_dx[0];
1244                        }
1245                    }
1246                }
1247    
1248                if (m_faceOffset[3] > -1) {
1249    #pragma omp for nowait
1250                    for (index_t k0 = left; k0 < left+m_ownNE[0]; ++k0) {
1251                        const double* f = arg.getSampleDataRO(m_faceOffset[3]+k0);
1252                        for (index_t i=0; i < numComp; ++i) {
1253                            int_local[i]+=f[i]*m_dx[0];
1254                        }
1255                    }
1256                }
1257    
1258    #pragma omp critical
1259                for (index_t i=0; i<numComp; i++)
1260                    integrals[i]+=int_local[i];
1261            } // end of parallel section
1262        } // function space selector
1263    }
1264    
1265    //protected
1266    dim_t Rectangle::insertNeighbourNodes(IndexVector& index, index_t node) const
1267    {
1268        const dim_t nDOF0 = (m_gNE[0]+1)/m_NX[0];
1269        const dim_t nDOF1 = (m_gNE[1]+1)/m_NX[1];
1270        const int x=node%nDOF0;
1271        const int y=node/nDOF0;
1272        dim_t num=0;
1273        // loop through potential neighbours and add to index if positions are
1274        // within bounds
1275        for (int i1=-1; i1<2; i1++) {
1276            for (int i0=-1; i0<2; i0++) {
1277                // skip node itself
1278                if (i0==0 && i1==0)
1279                    continue;
1280                // location of neighbour node
1281                const int nx=x+i0;
1282                const int ny=y+i1;
1283                if (nx>=0 && ny>=0 && nx<nDOF0 && ny<nDOF1) {
1284                    index.push_back(ny*nDOF0+nx);
1285                    num++;
1286                }
1287            }
1288        }
1289    
1290        return num;
1291    }
1292    
1293    //protected
1294    void Rectangle::nodesToDOF(escript::Data& out, const escript::Data& in) const
1295    {
1296        const dim_t numComp = in.getDataPointSize();
1297        out.requireWrite();
1298    
1299        const index_t left = (m_offset[0]==0 ? 0 : 1);
1300        const index_t bottom = (m_offset[1]==0 ? 0 : 1);
1301        const dim_t nDOF0 = (m_gNE[0]+1)/m_NX[0];
1302        const dim_t nDOF1 = (m_gNE[1]+1)/m_NX[1];
1303    #pragma omp parallel for
1304        for (index_t i=0; i<nDOF1; i++) {
1305            for (index_t j=0; j<nDOF0; j++) {
1306                const index_t n=j+left+(i+bottom)*m_NN[0];
1307                const double* src=in.getSampleDataRO(n);
1308                copy(src, src+numComp, out.getSampleDataRW(j+i*nDOF0));
1309          }          }
1310      }      }
1311  }  }
1312    
1313    //protected
1314    void Rectangle::dofToNodes(escript::Data& out, const escript::Data& in) const
1315    {
1316        const dim_t numComp = in.getDataPointSize();
1317        Paso_Coupler* coupler = Paso_Coupler_alloc(m_connector, numComp);
1318        // expand data object if necessary to be able to grab the whole data
1319        const_cast<escript::Data*>(&in)->expand();
1320        Paso_Coupler_startCollect(coupler, in.getSampleDataRO(0));
1321    
1322        const dim_t numDOF = getNumDOF();
1323        out.requireWrite();
1324        const double* buffer = Paso_Coupler_finishCollect(coupler);
1325    
1326    #pragma omp parallel for
1327        for (index_t i=0; i<getNumNodes(); i++) {
1328            const double* src=(m_dofMap[i]<numDOF ?
1329                    in.getSampleDataRO(m_dofMap[i])
1330                    : &buffer[(m_dofMap[i]-numDOF)*numComp]);
1331            copy(src, src+numComp, out.getSampleDataRW(i));
1332        }
1333        Paso_Coupler_free(coupler);
1334    }
1335    
1336  //private  //private
1337  void Rectangle::populateSampleIds()  void Rectangle::populateSampleIds()
1338  {  {
1339      const index_t firstId = getNumNodes()*m_mpiInfo->rank;      // degrees of freedom are numbered from left to right, bottom to top in
1340      const index_t diff0 = m_N0*(m_N1-1)+1;      // each rank, continuing on the next rank (ranks also go left-right,
1341      const index_t diff1 = m_N0*((m_NX-1)*m_N1+1);      // bottom-top).
1342        // This means rank 0 has id 0...n0-1, rank 1 has id n0...n1-1 etc. which
1343        // helps when writing out data rank after rank.
1344    
1345        // build node distribution vector first.
1346        // rank i owns m_nodeDistribution[i+1]-nodeDistribution[i] nodes which is
1347        // constant for all ranks in this implementation
1348        m_nodeDistribution.assign(m_mpiInfo->size+1, 0);
1349        const dim_t numDOF=getNumDOF();
1350        for (dim_t k=1; k<m_mpiInfo->size; k++) {
1351            m_nodeDistribution[k]=k*numDOF;
1352        }
1353        m_nodeDistribution[m_mpiInfo->size]=getNumDataPointsGlobal();
1354      m_nodeId.resize(getNumNodes());      m_nodeId.resize(getNumNodes());
1355        m_dofId.resize(numDOF);
1356        m_elementId.resize(getNumElements());
1357    
1358        // populate face element counts
1359        //left
1360        if (m_offset[0]==0)
1361            m_faceCount[0]=m_NE[1];
1362        else
1363            m_faceCount[0]=0;
1364        //right
1365        if (m_mpiInfo->rank%m_NX[0]==m_NX[0]-1)
1366            m_faceCount[1]=m_NE[1];
1367        else
1368            m_faceCount[1]=0;
1369        //bottom
1370        if (m_offset[1]==0)
1371            m_faceCount[2]=m_NE[0];
1372        else
1373            m_faceCount[2]=0;
1374        //top
1375        if (m_mpiInfo->rank/m_NX[0]==m_NX[1]-1)
1376            m_faceCount[3]=m_NE[0];
1377        else
1378            m_faceCount[3]=0;
1379    
1380        m_faceId.resize(getNumFaceElements());
1381    
1382        const index_t left = (m_offset[0]==0 ? 0 : 1);
1383        const index_t bottom = (m_offset[1]==0 ? 0 : 1);
1384        const dim_t nDOF0 = (m_gNE[0]+1)/m_NX[0];
1385        const dim_t nDOF1 = (m_gNE[1]+1)/m_NX[1];
1386    
1387    #define globalNodeId(x,y) \
1388        ((m_offset[0]+x)/nDOF0)*nDOF0*nDOF1+(m_offset[0]+x)%nDOF0 \
1389        + ((m_offset[1]+y)/nDOF1)*nDOF0*nDOF1*m_NX[0]+((m_offset[1]+y)%nDOF1)*nDOF0
1390    
1391        // set corner id's outside the parallel region
1392        m_nodeId[0] = globalNodeId(0, 0);
1393        m_nodeId[m_NN[0]-1] = globalNodeId(m_NN[0]-1, 0);
1394        m_nodeId[m_NN[0]*(m_NN[1]-1)] = globalNodeId(0, m_NN[1]-1);
1395        m_nodeId[m_NN[0]*m_NN[1]-1] = globalNodeId(m_NN[0]-1,m_NN[1]-1);
1396    #undef globalNodeId
1397    
1398    #pragma omp parallel
1399        {
1400            // populate degrees of freedom and own nodes (identical id)
1401    #pragma omp for nowait
1402            for (dim_t i=0; i<nDOF1; i++) {
1403                for (dim_t j=0; j<nDOF0; j++) {
1404                    const index_t nodeIdx=j+left+(i+bottom)*m_NN[0];
1405                    const index_t dofIdx=j+i*nDOF0;
1406                    m_dofId[dofIdx] = m_nodeId[nodeIdx]
1407                        = m_nodeDistribution[m_mpiInfo->rank]+dofIdx;
1408                }
1409            }
1410    
1411            // populate the rest of the nodes (shared with other ranks)
1412            if (m_faceCount[0]==0) { // left column
1413    #pragma omp for nowait
1414                for (dim_t i=0; i<nDOF1; i++) {
1415                    const index_t nodeIdx=(i+bottom)*m_NN[0];
1416                    const index_t dofId=(i+1)*nDOF0-1;
1417                    m_nodeId[nodeIdx]
1418                        = m_nodeDistribution[m_mpiInfo->rank-1]+dofId;
1419                }
1420            }
1421            if (m_faceCount[1]==0) { // right column
1422    #pragma omp for nowait
1423                for (dim_t i=0; i<nDOF1; i++) {
1424                    const index_t nodeIdx=(i+bottom+1)*m_NN[0]-1;
1425                    const index_t dofId=i*nDOF0;
1426                    m_nodeId[nodeIdx]
1427                        = m_nodeDistribution[m_mpiInfo->rank+1]+dofId;
1428                }
1429            }
1430            if (m_faceCount[2]==0) { // bottom row
1431    #pragma omp for nowait
1432                for (dim_t i=0; i<nDOF0; i++) {
1433                    const index_t nodeIdx=i+left;
1434                    const index_t dofId=nDOF0*(nDOF1-1)+i;
1435                    m_nodeId[nodeIdx]
1436                        = m_nodeDistribution[m_mpiInfo->rank-m_NX[0]]+dofId;
1437                }
1438            }
1439            if (m_faceCount[3]==0) { // top row
1440    #pragma omp for nowait
1441                for (dim_t i=0; i<nDOF0; i++) {
1442                    const index_t nodeIdx=m_NN[0]*(m_NN[1]-1)+i+left;
1443                    const index_t dofId=i;
1444                    m_nodeId[nodeIdx]
1445                        = m_nodeDistribution[m_mpiInfo->rank+m_NX[0]]+dofId;
1446                }
1447            }
1448    
1449            // populate element id's
1450    #pragma omp for nowait
1451            for (dim_t i1=0; i1<m_NE[1]; i1++) {
1452                for (dim_t i0=0; i0<m_NE[0]; i0++) {
1453                    m_elementId[i0+i1*m_NE[0]]=(m_offset[1]+i1)*m_gNE[0]+m_offset[0]+i0;
1454                }
1455            }
1456    
1457            // face elements
1458    #pragma omp for
1459            for (dim_t k=0; k<getNumFaceElements(); k++)
1460                m_faceId[k]=k;
1461        } // end parallel section
1462    
1463        m_nodeTags.assign(getNumNodes(), 0);
1464        updateTagsInUse(Nodes);
1465    
1466        m_elementTags.assign(getNumElements(), 0);
1467        updateTagsInUse(Elements);
1468    
1469        // generate face offset vector and set face tags
1470        const index_t LEFT=1, RIGHT=2, BOTTOM=10, TOP=20;
1471        const index_t faceTag[] = { LEFT, RIGHT, BOTTOM, TOP };
1472        m_faceOffset.assign(4, -1);
1473        m_faceTags.clear();
1474        index_t offset=0;
1475        for (size_t i=0; i<4; i++) {
1476            if (m_faceCount[i]>0) {
1477                m_faceOffset[i]=offset;
1478                offset+=m_faceCount[i];
1479                m_faceTags.insert(m_faceTags.end(), m_faceCount[i], faceTag[i]);
1480            }
1481        }
1482        setTagMap("left", LEFT);
1483        setTagMap("right", RIGHT);
1484        setTagMap("bottom", BOTTOM);
1485        setTagMap("top", TOP);
1486        updateTagsInUse(FaceElements);
1487    }
1488    
1489    //private
1490    void Rectangle::createPattern()
1491    {
1492        const dim_t nDOF0 = (m_gNE[0]+1)/m_NX[0];
1493        const dim_t nDOF1 = (m_gNE[1]+1)/m_NX[1];
1494        const index_t left = (m_offset[0]==0 ? 0 : 1);
1495        const index_t bottom = (m_offset[1]==0 ? 0 : 1);
1496    
1497        // populate node->DOF mapping with own degrees of freedom.
1498        // The rest is assigned in the loop further down
1499        m_dofMap.assign(getNumNodes(), 0);
1500  #pragma omp parallel for  #pragma omp parallel for
1501      for (dim_t k=0; k<getNumNodes(); k++) {      for (index_t i=bottom; i<bottom+nDOF1; i++) {
1502          index_t id = firstId+k;          for (index_t j=left; j<left+nDOF0; j++) {
1503          if (m_offset0 > 0 && k%m_N0==0)              m_dofMap[i*m_NN[0]+j]=(i-bottom)*nDOF0+j-left;
1504              id -= diff0;          }
1505          if (m_offset1 > 0 && k/m_N0==0)      }
1506              id -= diff1;  
1507          m_nodeId[k]=id;      // build list of shared components and neighbours by looping through
1508        // all potential neighbouring ranks and checking if positions are
1509        // within bounds
1510        const dim_t numDOF=nDOF0*nDOF1;
1511        vector<IndexVector> colIndices(numDOF); // for the couple blocks
1512        RankVector neighbour;
1513        IndexVector offsetInShared(1,0);
1514        IndexVector sendShared, recvShared;
1515        int numShared=0;
1516        const int x=m_mpiInfo->rank%m_NX[0];
1517        const int y=m_mpiInfo->rank/m_NX[0];
1518        for (int i1=-1; i1<2; i1++) {
1519            for (int i0=-1; i0<2; i0++) {
1520                // skip this rank
1521                if (i0==0 && i1==0)
1522                    continue;
1523                // location of neighbour rank
1524                const int nx=x+i0;
1525                const int ny=y+i1;
1526                if (nx>=0 && ny>=0 && nx<m_NX[0] && ny<m_NX[1]) {
1527                    neighbour.push_back(ny*m_NX[0]+nx);
1528                    if (i0==0) {
1529                        // sharing top or bottom edge
1530                        const int firstDOF=(i1==-1 ? 0 : numDOF-nDOF0);
1531                        const int firstNode=(i1==-1 ? left : m_NN[0]*(m_NN[1]-1)+left);
1532                        offsetInShared.push_back(offsetInShared.back()+nDOF0);
1533                        for (dim_t i=0; i<nDOF0; i++, numShared++) {
1534                            sendShared.push_back(firstDOF+i);
1535                            recvShared.push_back(numDOF+numShared);
1536                            if (i>0)
1537                                colIndices[firstDOF+i-1].push_back(numShared);
1538                            colIndices[firstDOF+i].push_back(numShared);
1539                            if (i<nDOF0-1)
1540                                colIndices[firstDOF+i+1].push_back(numShared);
1541                            m_dofMap[firstNode+i]=numDOF+numShared;
1542                        }
1543                    } else if (i1==0) {
1544                        // sharing left or right edge
1545                        const int firstDOF=(i0==-1 ? 0 : nDOF0-1);
1546                        const int firstNode=(i0==-1 ? bottom*m_NN[0] : (bottom+1)*m_NN[0]-1);
1547                        offsetInShared.push_back(offsetInShared.back()+nDOF1);
1548                        for (dim_t i=0; i<nDOF1; i++, numShared++) {
1549                            sendShared.push_back(firstDOF+i*nDOF0);
1550                            recvShared.push_back(numDOF+numShared);
1551                            if (i>0)
1552                                colIndices[firstDOF+(i-1)*nDOF0].push_back(numShared);
1553                            colIndices[firstDOF+i*nDOF0].push_back(numShared);
1554                            if (i<nDOF1-1)
1555                                colIndices[firstDOF+(i+1)*nDOF0].push_back(numShared);
1556                            m_dofMap[firstNode+i*m_NN[0]]=numDOF+numShared;
1557                        }
1558                    } else {
1559                        // sharing a node
1560                        const int dof=(i0+1)/2*(nDOF0-1)+(i1+1)/2*(numDOF-nDOF0);
1561                        const int node=(i0+1)/2*(m_NN[0]-1)+(i1+1)/2*m_NN[0]*(m_NN[1]-1);
1562                        offsetInShared.push_back(offsetInShared.back()+1);
1563                        sendShared.push_back(dof);
1564                        recvShared.push_back(numDOF+numShared);
1565                        colIndices[dof].push_back(numShared);
1566                        m_dofMap[node]=numDOF+numShared;
1567                        ++numShared;
1568                    }
1569                }
1570            }
1571        }
1572    
1573        // create connector
1574        Paso_SharedComponents *snd_shcomp = Paso_SharedComponents_alloc(
1575                numDOF, neighbour.size(), &neighbour[0], &sendShared[0],
1576                &offsetInShared[0], 1, 0, m_mpiInfo);
1577        Paso_SharedComponents *rcv_shcomp = Paso_SharedComponents_alloc(
1578                numDOF, neighbour.size(), &neighbour[0], &recvShared[0],
1579                &offsetInShared[0], 1, 0, m_mpiInfo);
1580        m_connector = Paso_Connector_alloc(snd_shcomp, rcv_shcomp);
1581        Paso_SharedComponents_free(snd_shcomp);
1582        Paso_SharedComponents_free(rcv_shcomp);
1583    
1584        // create main and couple blocks
1585        Paso_Pattern *mainPattern = createMainPattern();
1586        Paso_Pattern *colPattern, *rowPattern;
1587        createCouplePatterns(colIndices, numShared, &colPattern, &rowPattern);
1588    
1589        // allocate paso distribution
1590        Paso_Distribution* distribution = Paso_Distribution_alloc(m_mpiInfo,
1591                const_cast<index_t*>(&m_nodeDistribution[0]), 1, 0);
1592    
1593        // finally create the system matrix
1594        m_pattern = Paso_SystemMatrixPattern_alloc(MATRIX_FORMAT_DEFAULT,
1595                distribution, distribution, mainPattern, colPattern, rowPattern,
1596                m_connector, m_connector);
1597    
1598        Paso_Distribution_free(distribution);
1599    
1600        // useful debug output
1601        /*
1602        cout << "--- rcv_shcomp ---" << endl;
1603        cout << "numDOF=" << numDOF << ", numNeighbors=" << neighbour.size() << endl;
1604        for (size_t i=0; i<neighbour.size(); i++) {
1605            cout << "neighbor[" << i << "]=" << neighbour[i]
1606                << " offsetInShared[" << i+1 << "]=" << offsetInShared[i+1] << endl;
1607        }
1608        for (size_t i=0; i<recvShared.size(); i++) {
1609            cout << "shared[" << i << "]=" << recvShared[i] << endl;
1610        }
1611        cout << "--- snd_shcomp ---" << endl;
1612        for (size_t i=0; i<sendShared.size(); i++) {
1613            cout << "shared[" << i << "]=" << sendShared[i] << endl;
1614        }
1615        cout << "--- dofMap ---" << endl;
1616        for (size_t i=0; i<m_dofMap.size(); i++) {
1617            cout << "m_dofMap[" << i << "]=" << m_dofMap[i] << endl;
1618        }
1619        cout << "--- colIndices ---" << endl;
1620        for (size_t i=0; i<colIndices.size(); i++) {
1621            cout << "colIndices[" << i << "].size()=" << colIndices[i].size() << endl;
1622        }
1623        */
1624    
1625        /*
1626        cout << "--- main_pattern ---" << endl;
1627        cout << "M=" << mainPattern->numOutput << ", N=" << mainPattern->numInput << endl;
1628        for (size_t i=0; i<mainPattern->numOutput+1; i++) {
1629            cout << "ptr[" << i << "]=" << mainPattern->ptr[i] << endl;
1630        }
1631        for (size_t i=0; i<mainPattern->ptr[mainPattern->numOutput]; i++) {
1632            cout << "index[" << i << "]=" << mainPattern->index[i] << endl;
1633        }
1634        */
1635    
1636        /*
1637        cout << "--- colCouple_pattern ---" << endl;
1638        cout << "M=" << colPattern->numOutput << ", N=" << colPattern->numInput << endl;
1639        for (size_t i=0; i<colPattern->numOutput+1; i++) {
1640            cout << "ptr[" << i << "]=" << colPattern->ptr[i] << endl;
1641        }
1642        for (size_t i=0; i<colPattern->ptr[colPattern->numOutput]; i++) {
1643            cout << "index[" << i << "]=" << colPattern->index[i] << endl;
1644        }
1645        */
1646    
1647        /*
1648        cout << "--- rowCouple_pattern ---" << endl;
1649        cout << "M=" << rowPattern->numOutput << ", N=" << rowPattern->numInput << endl;
1650        for (size_t i=0; i<rowPattern->numOutput+1; i++) {
1651            cout << "ptr[" << i << "]=" << rowPattern->ptr[i] << endl;
1652        }
1653        for (size_t i=0; i<rowPattern->ptr[rowPattern->numOutput]; i++) {
1654            cout << "index[" << i << "]=" << rowPattern->index[i] << endl;
1655        }
1656        */
1657    
1658        Paso_Pattern_free(mainPattern);
1659        Paso_Pattern_free(colPattern);
1660        Paso_Pattern_free(rowPattern);
1661    }
1662    
1663    //private
1664    void Rectangle::addToMatrixAndRHS(Paso_SystemMatrix* S, escript::Data& F,
1665             const vector<double>& EM_S, const vector<double>& EM_F, bool addS,
1666             bool addF, index_t firstNode, dim_t nEq, dim_t nComp) const
1667    {
1668        IndexVector rowIndex;
1669        rowIndex.push_back(m_dofMap[firstNode]);
1670        rowIndex.push_back(m_dofMap[firstNode+1]);
1671        rowIndex.push_back(m_dofMap[firstNode+m_NN[0]]);
1672        rowIndex.push_back(m_dofMap[firstNode+m_NN[0]+1]);
1673        if (addF) {
1674            double *F_p=F.getSampleDataRW(0);
1675            for (index_t i=0; i<rowIndex.size(); i++) {
1676                if (rowIndex[i]<getNumDOF()) {
1677                    for (index_t eq=0; eq<nEq; eq++) {
1678                        F_p[INDEX2(eq, rowIndex[i], nEq)]+=EM_F[INDEX2(eq,i,nEq)];
1679                    }
1680                }
1681            }
1682        }
1683        if (addS) {
1684            addToSystemMatrix(S, rowIndex, nEq, rowIndex, nComp, EM_S);
1685        }
1686    }
1687    
1688    //protected
1689    void Rectangle::interpolateNodesOnElements(escript::Data& out,
1690                                               const escript::Data& in,
1691                                               bool reduced) const
1692    {
1693        const dim_t numComp = in.getDataPointSize();
1694        if (reduced) {
1695            out.requireWrite();
1696            const double c0 = 0.25;
1697    #pragma omp parallel
1698            {
1699                vector<double> f_00(numComp);
1700                vector<double> f_01(numComp);
1701                vector<double> f_10(numComp);
1702                vector<double> f_11(numComp);
1703    #pragma omp for
1704                for (index_t k1=0; k1 < m_NE[1]; ++k1) {
1705                    for (index_t k0=0; k0 < m_NE[0]; ++k0) {
1706                        memcpy(&f_00[0], in.getSampleDataRO(INDEX2(k0,k1, m_NN[0])), numComp*sizeof(double));
1707                        memcpy(&f_01[0], in.getSampleDataRO(INDEX2(k0,k1+1, m_NN[0])), numComp*sizeof(double));
1708                        memcpy(&f_10[0], in.getSampleDataRO(INDEX2(k0+1,k1, m_NN[0])), numComp*sizeof(double));
1709                        memcpy(&f_11[0], in.getSampleDataRO(INDEX2(k0+1,k1+1, m_NN[0])), numComp*sizeof(double));
1710                        double* o = out.getSampleDataRW(INDEX2(k0,k1,m_NE[0]));
1711                        for (index_t i=0; i < numComp; ++i) {
1712                            o[INDEX2(i,numComp,0)] = c0*(f_00[i] + f_01[i] + f_10[i] + f_11[i]);
1713                        } /* end of component loop i */
1714                    } /* end of k0 loop */
1715                } /* end of k1 loop */
1716            } /* end of parallel section */
1717        } else {
1718            out.requireWrite();
1719            const double c0 = 0.16666666666666666667;
1720            const double c1 = 0.044658198738520451079;
1721            const double c2 = 0.62200846792814621559;
1722    #pragma omp parallel
1723            {
1724                vector<double> f_00(numComp);
1725                vector<double> f_01(numComp);
1726                vector<double> f_10(numComp);
1727                vector<double> f_11(numComp);
1728    #pragma omp for
1729                for (index_t k1=0; k1 < m_NE[1]; ++k1) {
1730                    for (index_t k0=0; k0 < m_NE[0]; ++k0) {
1731                        memcpy(&f_00[0], in.getSampleDataRO(INDEX2(k0,k1, m_NN[0])), numComp*sizeof(double));
1732                        memcpy(&f_01[0], in.getSampleDataRO(INDEX2(k0,k1+1, m_NN[0])), numComp*sizeof(double));
1733                        memcpy(&f_10[0], in.getSampleDataRO(INDEX2(k0+1,k1, m_NN[0])), numComp*sizeof(double));
1734                        memcpy(&f_11[0], in.getSampleDataRO(INDEX2(k0+1,k1+1, m_NN[0])), numComp*sizeof(double));
1735                        double* o = out.getSampleDataRW(INDEX2(k0,k1,m_NE[0]));
1736                        for (index_t i=0; i < numComp; ++i) {
1737                            o[INDEX2(i,numComp,0)] = c0*(f_01[i] + f_10[i]) + c1*f_11[i] + c2*f_00[i];
1738                            o[INDEX2(i,numComp,1)] = c0*(f_00[i] + f_11[i]) + c1*f_01[i] + c2*f_10[i];
1739                            o[INDEX2(i,numComp,2)] = c0*(f_00[i] + f_11[i]) + c1*f_10[i] + c2*f_01[i];
1740                            o[INDEX2(i,numComp,3)] = c0*(f_01[i] + f_10[i]) + c1*f_00[i] + c2*f_11[i];
1741                        } /* end of component loop i */
1742                    } /* end of k0 loop */
1743                } /* end of k1 loop */
1744            } /* end of parallel section */
1745        }
1746    }
1747    
1748    //protected
1749    void Rectangle::interpolateNodesOnFaces(escript::Data& out,
1750                                            const escript::Data& in,
1751                                            bool reduced) const
1752    {
1753        const dim_t numComp = in.getDataPointSize();
1754        if (reduced) {
1755            out.requireWrite();
1756    #pragma omp parallel
1757            {
1758                vector<double> f_00(numComp);
1759                vector<double> f_01(numComp);
1760                vector<double> f_10(numComp);
1761                vector<double> f_11(numComp);
1762                if (m_faceOffset[0] > -1) {
1763    #pragma omp for nowait
1764                    for (index_t k1=0; k1 < m_NE[1]; ++k1) {
1765                        memcpy(&f_00[0], in.getSampleDataRO(INDEX2(0,k1, m_NN[0])), numComp*sizeof(double));
1766                        memcpy(&f_01[0], in.getSampleDataRO(INDEX2(0,k1+1, m_NN[0])), numComp*sizeof(double));
1767                        double* o = out.getSampleDataRW(m_faceOffset[0]+k1);
1768                        for (index_t i=0; i < numComp; ++i) {
1769                            o[INDEX2(i,numComp,0)] = (f_00[i] + f_01[i])/2;
1770                        } /* end of component loop i */
1771                    } /* end of k1 loop */
1772                } /* end of face 0 */
1773                if (m_faceOffset[1] > -1) {
1774    #pragma omp for nowait
1775                    for (index_t k1=0; k1 < m_NE[1]; ++k1) {
1776                        memcpy(&f_10[0], in.getSampleDataRO(INDEX2(m_NN[0]-1,k1, m_NN[0])), numComp*sizeof(double));
1777                        memcpy(&f_11[0], in.getSampleDataRO(INDEX2(m_NN[0]-1,k1+1, m_NN[0])), numComp*sizeof(double));
1778                        double* o = out.getSampleDataRW(m_faceOffset[1]+k1);
1779                        for (index_t i=0; i < numComp; ++i) {
1780                            o[INDEX2(i,numComp,0)] = (f_10[i] + f_11[i])/2;
1781                        } /* end of component loop i */
1782                    } /* end of k1 loop */
1783                } /* end of face 1 */
1784                if (m_faceOffset[2] > -1) {
1785    #pragma omp for nowait
1786                    for (index_t k0=0; k0 < m_NE[0]; ++k0) {
1787                        memcpy(&f_00[0], in.getSampleDataRO(INDEX2(k0,0, m_NN[0])), numComp*sizeof(double));
1788                        memcpy(&f_10[0], in.getSampleDataRO(INDEX2(k0+1,0, m_NN[0])), numComp*sizeof(double));
1789                        double* o = out.getSampleDataRW(m_faceOffset[2]+k0);
1790                        for (index_t i=0; i < numComp; ++i) {
1791                            o[INDEX2(i,numComp,0)] = (f_00[i] + f_10[i])/2;
1792                        } /* end of component loop i */
1793                    } /* end of k0 loop */
1794                } /* end of face 2 */
1795                if (m_faceOffset[3] > -1) {
1796    #pragma omp for nowait
1797                    for (index_t k0=0; k0 < m_NE[0]; ++k0) {
1798                        memcpy(&f_01[0], in.getSampleDataRO(INDEX2(k0,m_NN[1]-1, m_NN[0])), numComp*sizeof(double));
1799                        memcpy(&f_11[0], in.getSampleDataRO(INDEX2(k0+1,m_NN[1]-1, m_NN[0])), numComp*sizeof(double));
1800                        double* o = out.getSampleDataRW(m_faceOffset[3]+k0);
1801                        for (index_t i=0; i < numComp; ++i) {
1802                            o[INDEX2(i,numComp,0)] = (f_01[i] + f_11[i])/2;
1803                        } /* end of component loop i */
1804                    } /* end of k0 loop */
1805                } /* end of face 3 */
1806            } /* end of parallel section */
1807        } else {
1808            out.requireWrite();
1809            const double c0 = 0.21132486540518711775;
1810            const double c1 = 0.78867513459481288225;
1811    #pragma omp parallel
1812            {
1813                vector<double> f_00(numComp);
1814                vector<double> f_01(numComp);
1815                vector<double> f_10(numComp);
1816                vector<double> f_11(numComp);
1817                if (m_faceOffset[0] > -1) {
1818    #pragma omp for nowait
1819                    for (index_t k1=0; k1 < m_NE[1]; ++k1) {
1820                        memcpy(&f_00[0], in.getSampleDataRO(INDEX2(0,k1, m_NN[0])), numComp*sizeof(double));
1821                        memcpy(&f_01[0], in.getSampleDataRO(INDEX2(0,k1+1, m_NN[0])), numComp*sizeof(double));
1822                        double* o = out.getSampleDataRW(m_faceOffset[0]+k1);
1823                        for (index_t i=0; i < numComp; ++i) {
1824                            o[INDEX2(i,numComp,0)] = c0*f_01[i] + c1*f_00[i];
1825                            o[INDEX2(i,numComp,1)] = c0*f_00[i] + c1*f_01[i];
1826                        } /* end of component loop i */
1827                    } /* end of k1 loop */
1828                } /* end of face 0 */
1829                if (m_faceOffset[1] > -1) {
1830    #pragma omp for nowait
1831                    for (index_t k1=0; k1 < m_NE[1]; ++k1) {
1832                        memcpy(&f_10[0], in.getSampleDataRO(INDEX2(m_NN[0]-1,k1, m_NN[0])), numComp*sizeof(double));
1833                        memcpy(&f_11[0], in.getSampleDataRO(INDEX2(m_NN[0]-1,k1+1, m_NN[0])), numComp*sizeof(double));
1834                        double* o = out.getSampleDataRW(m_faceOffset[1]+k1);
1835                        for (index_t i=0; i < numComp; ++i) {
1836                            o[INDEX2(i,numComp,0)] = c1*f_10[i] + c0*f_11[i];
1837                            o[INDEX2(i,numComp,1)] = c1*f_11[i] + c0*f_10[i];
1838                        } /* end of component loop i */
1839                    } /* end of k1 loop */
1840                } /* end of face 1 */
1841                if (m_faceOffset[2] > -1) {
1842    #pragma omp for nowait
1843                    for (index_t k0=0; k0 < m_NE[0]; ++k0) {
1844                        memcpy(&f_00[0], in.getSampleDataRO(INDEX2(k0,0, m_NN[0])), numComp*sizeof(double));
1845                        memcpy(&f_10[0], in.getSampleDataRO(INDEX2(k0+1,0, m_NN[0])), numComp*sizeof(double));
1846                        double* o = out.getSampleDataRW(m_faceOffset[2]+k0);
1847                        for (index_t i=0; i < numComp; ++i) {
1848                            o[INDEX2(i,numComp,0)] = c0*f_10[i] + c1*f_00[i];
1849                            o[INDEX2(i,numComp,1)] = c0*f_00[i] + c1*f_10[i];
1850                        } /* end of component loop i */
1851                    } /* end of k0 loop */
1852                } /* end of face 2 */
1853                if (m_faceOffset[3] > -1) {
1854    #pragma omp for nowait
1855                    for (index_t k0=0; k0 < m_NE[0]; ++k0) {
1856                        memcpy(&f_01[0], in.getSampleDataRO(INDEX2(k0,m_NN[1]-1, m_NN[0])), numComp*sizeof(double));
1857                        memcpy(&f_11[0], in.getSampleDataRO(INDEX2(k0+1,m_NN[1]-1, m_NN[0])), numComp*sizeof(double));
1858                        double* o = out.getSampleDataRW(m_faceOffset[3]+k0);
1859                        for (index_t i=0; i < numComp; ++i) {
1860                            o[INDEX2(i,numComp,0)] = c0*f_11[i] + c1*f_01[i];
1861                            o[INDEX2(i,numComp,1)] = c0*f_01[i] + c1*f_11[i];
1862                        } /* end of component loop i */
1863                    } /* end of k0 loop */
1864                } /* end of face 3 */
1865            } /* end of parallel section */
1866        }
1867    }
1868    
1869    
1870    
1871    namespace
1872    {
1873        // Calculates a guassian blur colvolution matrix for 2D
1874        // See wiki article on the subject    
1875        double* get2DGauss(unsigned radius, double sigma)
1876        {
1877            double* arr=new double[(radius*2+1)*(radius*2+1)];
1878            double common=M_1_PI*0.5*1/(sigma*sigma);
1879            double total=0;
1880            int r=static_cast<int>(radius);
1881            for (int y=-r;y<=r;++y)
1882            {
1883                for (int x=-r;x<=r;++x)
1884                {        
1885                    arr[(x+r)+(y+r)*(r*2+1)]=common*exp(-(x*x+y*y)/(2*sigma*sigma));
1886                    total+=arr[(x+r)+(y+r)*(r*2+1)];
1887                }
1888            }
1889            double invtotal=1/total;
1890            for (size_t p=0;p<(radius*2+1)*(radius*2+1);++p)
1891            {
1892                arr[p]*=invtotal;
1893            }
1894            return arr;
1895        }
1896        
1897        // applies conv to source to get a point.
1898        // (xp, yp) are the coords in the source matrix not the destination matrix
1899        double Convolve2D(double* conv, double* source, size_t xp, size_t yp, unsigned radius, size_t width)
1900        {
1901            size_t bx=xp-radius, by=yp-radius;
1902            size_t sbase=bx+by*width;
1903            double result=0;
1904            for (int y=0;y<2*radius+1;++y)
1905            {        
1906                for (int x=0;x<2*radius+1;++x)
1907                {
1908                    result+=conv[x+y*(2*radius+1)] * source[sbase + x+y*width];
1909                }
1910            }
1911            return result;      
1912        }
1913    }
1914    
1915    
1916    /* This is a wrapper for filtered (and non-filtered) randoms
1917     * For detailed doco see randomFillWorker
1918    */
1919    escript::Data Rectangle::randomFill(const escript::DataTypes::ShapeType& shape,
1920           const escript::FunctionSpace& what,
1921           long seed, const boost::python::tuple& filter) const
1922    {
1923        int numvals=escript::DataTypes::noValues(shape);
1924        if (len(filter)>0 && (numvals!=1))
1925        {
1926            throw RipleyException("Ripley only supports filters for scalar data.");
1927        }
1928        escript::Data res=randomFillWorker(shape, seed, filter);
1929        if (res.getFunctionSpace()!=what)
1930        {
1931            escript::Data r=escript::Data(res, what);
1932            return r;
1933        }
1934        return res;
1935    }
1936    
1937    
1938    /* This routine produces a Data object filled with smoothed random data.
1939    The dimensions of the rectangle being filled are internal[0] x internal[1] points.
1940    A parameter radius  gives the size of the stencil used for the smoothing.
1941    A point on the left hand edge for example, will still require `radius` extra points to the left
1942    in order to complete the stencil.
1943    
1944    All local calculation is done on an array called `src`, with
1945    dimensions = ext[0] * ext[1].
1946    Where ext[i]= internal[i]+2*radius.
1947    
1948    Now for MPI there is overlap to deal with. We need to share both the overlapping
1949    values themselves but also the external region.
1950    
1951    In a hypothetical 1-D case:
1952    
1953    
1954    1234567
1955    would be split into two ranks thus:
1956    123(4)  (4)567     [4 being a shared element]
1957    
1958    If the radius is 2.   There will be padding elements on the outside:
1959    
1960    pp123(4)  (4)567pp
1961    
1962    To ensure that 4 can be correctly computed on both ranks, values from the other rank
1963    need to be known.
1964    
1965    pp123(4)56   23(4)567pp
1966    
1967    Now in our case, we wout set all the values 23456 on the left rank and send them to the
1968    right hand rank.
1969    
1970    So the edges _may_ need to be shared at a distance `inset` from all boundaries.
1971    inset=2*radius+1    
1972    This is to ensure that values at distance `radius` from the shared/overlapped element
1973    that ripley has.
1974    
1975    
1976    */
1977    escript::Data Rectangle::randomFillWorker(const escript::DataTypes::ShapeType& shape,
1978           long seed, const boost::python::tuple& filter) const
1979    {
1980        if (m_numDim!=2)
1981        {
1982            throw RipleyException("Only 2D supported at this time.");
1983        }
1984    
1985        unsigned int radius=0;      // these are only used by gaussian
1986        double sigma=0.5;
1987        
1988        unsigned int numvals=escript::DataTypes::noValues(shape);
1989            
1990        
1991        if (len(filter)==0)
1992        {
1993            // nothing special required here yet
1994        }    
1995        else if (len(filter)==3)
1996        {
1997            boost::python::extract<string> ex(filter[0]);
1998            if (!ex.check() || (ex()!="gaussian"))
1999            {
2000                throw RipleyException("Unsupported random filter");
2001            }
2002            boost::python::extract<unsigned int> ex1(filter[1]);
2003            if (!ex1.check())
2004            {
2005                throw RipleyException("Radius of gaussian filter must be a positive integer.");
2006            }
2007            radius=ex1();
2008            sigma=0.5;
2009            boost::python::extract<double> ex2(filter[2]);
2010            if (!ex2.check() || (sigma=ex2())<=0)
2011            {
2012                throw RipleyException("Sigma must be a postive floating point number.");
2013            }        
2014        }
2015        else
2016        {
2017            throw RipleyException("Unsupported random filter for Rectangle.");
2018        }
2019          
2020      
2021        
2022        size_t internal[2];
2023        internal[0]=m_NE[0]+1;      // number of points in the internal region
2024        internal[1]=m_NE[1]+1;      // that is, the ones we need smoothed versions of
2025        size_t ext[2];
2026        ext[0]=internal[0]+2*radius;        // includes points we need as input
2027        ext[1]=internal[1]+2*radius;        // for smoothing
2028        
2029        // now we check to see if the radius is acceptable
2030        // That is, would not cross multiple ranks in MPI
2031    
2032        if (2*radius>=internal[0]-4)
2033        {
2034            throw RipleyException("Radius of gaussian filter is too large for X dimension of a rank");
2035        }
2036        if (2*radius>=internal[1]-4)
2037        {
2038            throw RipleyException("Radius of gaussian filter is too large for Y dimension of a rank");
2039        }
2040    
2041        double* src=new double[ext[0]*ext[1]*numvals];
2042        esysUtils::randomFillArray(seed, src, ext[0]*ext[1]*numvals);  
2043        
2044    
2045    
2046    #ifdef ESYS_MPI
2047        if ((internal[0]<5) || (internal[1]<5))
2048        {
2049            // since the dimensions are equal for all ranks, this exception
2050            // will be thrown on all ranks
2051            throw RipleyException("Random Data in Ripley requries at least five elements per side per rank.");
2052        }
2053        dim_t X=m_mpiInfo->rank%m_NX[0];
2054        dim_t Y=m_mpiInfo->rank%(m_NX[0]*m_NX[1])/m_NX[0];
2055    #endif      
2056    
2057    /*    
2058        // if we wanted to test a repeating pattern
2059        size_t basex=0;
2060        size_t basey=0;
2061    #ifdef ESYS_MPI    
2062        basex=X*m_gNE[0]/m_NX[0];
2063        basey=Y*m_gNE[1]/m_NX[1];
2064    #endif
2065            
2066        esysUtils::patternFillArray2D(ext[0], ext[1], src, 4, basex, basey, numvals);
2067    */
2068    
2069        
2070    #ifdef ESYS_MPI  
2071        
2072        BlockGrid2 grid(m_NX[0]-1, m_NX[1]-1);
2073        size_t inset=2*radius+2;    // Its +2 not +1 because a whole element is shared (and hence
2074                    // there is an overlap of two points both of which need to have "radius" points on either side.
2075        
2076        size_t xmidlen=ext[0]-2*inset;      // how wide is the x-dimension between the two insets
2077        size_t ymidlen=ext[1]-2*inset;      
2078        
2079        Block2 block(ext[0], ext[1], inset, xmidlen, ymidlen, numvals);
2080    
2081        MPI_Request reqs[40];               // a non-tight upper bound on how many we need
2082        MPI_Status stats[40];
2083        short rused=0;
2084        
2085        messvec incoms;
2086        messvec outcoms;  
2087        
2088        grid.generateInNeighbours(X, Y, incoms);
2089        grid.generateOutNeighbours(X, Y, outcoms);
2090        
2091        block.copyAllToBuffer(src);  
2092        
2093        int comserr=0;    
2094        for (size_t i=0;i<incoms.size();++i)
2095        {
2096            message& m=incoms[i];
2097            comserr|=MPI_Irecv(block.getInBuffer(m.destbuffid), block.getBuffSize(m.destbuffid) , MPI_DOUBLE, m.sourceID, m.tag, m_mpiInfo->comm, reqs+(rused++));
2098            block.setUsed(m.destbuffid);
2099        
2100        }
2101    
2102        for (size_t i=0;i<outcoms.size();++i)
2103        {
2104            message& m=outcoms[i];
2105            comserr|=MPI_Isend(block.getOutBuffer(m.srcbuffid), block.getBuffSize(m.srcbuffid) , MPI_DOUBLE, m.destID, m.tag, m_mpiInfo->comm, reqs+(rused++));
2106        }    
2107        
2108        if (!comserr)
2109        {    
2110            comserr=MPI_Waitall(rused, reqs, stats);    
2111        }
2112    
2113        if (comserr)
2114        {
2115            // Yes this is throwing an exception as a result of an MPI error.
2116            // and no we don't inform the other ranks that we are doing this.
2117            // however, we have no reason to believe coms work at this point anyway
2118            throw RipleyException("Error in coms for randomFill");      
2119        }
2120        
2121        block.copyUsedFromBuffer(src);    
2122        
2123    #endif    
2124        
2125        if (radius==0 || numvals>1) // the truth of either should imply the truth of the other but let's be safe
2126        {
2127          
2128            escript::FunctionSpace fs(getPtr(), getContinuousFunctionCode());
2129            escript::Data resdat(0, shape, fs , true);
2130            // don't need to check for exwrite because we just made it
2131            escript::DataVector& dv=resdat.getExpandedVectorReference();
2132            
2133            
2134            // now we need to copy values over
2135            for (size_t y=0;y<(internal[1]);++y)    
2136            {
2137                for (size_t x=0;x<(internal[0]);++x)
2138                {
2139                    for (unsigned int i=0;i<numvals;++i)
2140                    {
2141                        dv[i+(x+y*(internal[0]))*numvals]=src[i+(x+y*ext[0])*numvals];
2142                    }
2143                }
2144            }
2145            delete[] src;
2146            return resdat;      
2147        }
2148        else                // filter enabled      
2149        {    
2150            escript::FunctionSpace fs(getPtr(), getContinuousFunctionCode());
2151            escript::Data resdat(0, escript::DataTypes::scalarShape, fs , true);
2152            // don't need to check for exwrite because we just made it
2153            escript::DataVector& dv=resdat.getExpandedVectorReference();
2154            double* convolution=get2DGauss(radius, sigma);
2155            for (size_t y=0;y<(internal[1]);++y)    
2156            {
2157                for (size_t x=0;x<(internal[0]);++x)
2158                {    
2159                    dv[x+y*(internal[0])]=Convolve2D(convolution, src, x+radius, y+radius, radius, ext[0]);
2160                    
2161                }
2162            }
2163            delete[] convolution;
2164            delete[] src;
2165            return resdat;
2166        }
2167    }
2168    
2169    int Rectangle::findNode(const double *coords) const {
2170        const int NOT_MINE = -1;
2171        //is the found element even owned by this rank
2172        // (inside owned or shared elements but will map to an owned element)
2173        for (int dim = 0; dim < m_numDim; dim++) {
2174            double min = m_origin[dim] + m_offset[dim]* m_dx[dim]
2175                    - m_dx[dim]/2.; //allows for point outside mapping onto node
2176            double max = m_origin[dim] + (m_offset[dim] + m_NE[dim])*m_dx[dim]
2177                    + m_dx[dim]/2.;
2178            if (min > coords[dim] || max < coords[dim]) {
2179                return NOT_MINE;
2180            }
2181        }
2182        // get distance from origin
2183        double x = coords[0] - m_origin[0];
2184        double y = coords[1] - m_origin[1];
2185        // distance in elements
2186        int ex = (int) floor(x / m_dx[0]);
2187        int ey = (int) floor(y / m_dx[1]);
2188        // set the min distance high enough to be outside the element plus a bit
2189        int closest = NOT_MINE;
2190        double minDist = 1;
2191        for (int dim = 0; dim < m_numDim; dim++) {
2192            minDist += m_dx[dim]*m_dx[dim];
2193        }
2194        //find the closest node
2195        for (int dx = 0; dx < 1; dx++) {
2196            double xdist = (x - (ex + dx)*m_dx[0]);
2197            for (int dy = 0; dy < 1; dy++) {
2198                double ydist = (y - (ey + dy)*m_dx[1]);
2199                double total = xdist*xdist + ydist*ydist;
2200                if (total < minDist) {
2201                    closest = INDEX2(ex+dx-m_offset[0], ey+dy-m_offset[1], m_NE[0] + 1);
2202                    minDist = total;
2203                }
2204            }
2205        }
2206        //if this happens, we've let a dirac point slip through, which is awful
2207        if (closest == NOT_MINE) {
2208            throw RipleyException("Unable to map appropriate dirac point to a node,"
2209                    " implementation problem in Rectangle::findNode()");
2210        }
2211        return closest;
2212    }
2213    
2214    void Rectangle::setAssembler(std::string type, std::map<std::string,
2215            escript::Data> constants) {
2216        if (type.compare("WaveAssembler") == 0) {
2217            delete assembler;
2218            assembler = new WaveAssembler2D(this, m_dx, m_NX, m_NE, m_NN, constants);
2219        } else { //else ifs would go before this for other types
2220            throw RipleyException("Ripley::Rectangle does not support the"
2221                                    " requested assembler");
2222      }      }
2223  }  }
2224    

Legend:
Removed from v.3691  
changed lines
  Added in v.4705

  ViewVC Help
Powered by ViewVC 1.1.26