$extrastylesheet
parallel_sort.C
Go to the documentation of this file.
00001 // The libMesh Finite Element Library.
00002 // Copyright (C) 2002-2014 Benjamin S. Kirk, John W. Peterson, Roy H. Stogner
00003 
00004 // This library is free software; you can redistribute it and/or
00005 // modify it under the terms of the GNU Lesser General Public
00006 // License as published by the Free Software Foundation; either
00007 // version 2.1 of the License, or (at your option) any later version.
00008 
00009 // This library is distributed in the hope that it will be useful,
00010 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00011 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00012 // Lesser General Public License for more details.
00013 
00014 // You should have received a copy of the GNU Lesser General Public
00015 // License along with this library; if not, write to the Free Software
00016 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017 
00018 
00019 // System Includes
00020 #include <algorithm>
00021 #include <iostream>
00022 
00023 // Local Includes
00024 #include "libmesh/libmesh_common.h"
00025 #include "libmesh/parallel.h"
00026 #include "libmesh/parallel_hilbert.h"
00027 #include "libmesh/parallel_sort.h"
00028 #include "libmesh/parallel_bin_sorter.h"
00029 #ifdef LIBMESH_HAVE_LIBHILBERT
00030 #  include "hilbert.h"
00031 #endif
00032 
00033 namespace libMesh
00034 {
00035 
00036 
00037 namespace Parallel {
00038 
00039 // The Constructor sorts the local data using
00040 // std::sort().  Therefore, the construction of
00041 // a Parallel::Sort object takes O(nlogn) time,
00042 // where n is the length of _data.
00043 template <typename KeyType, typename IdxType>
00044 Sort<KeyType,IdxType>::Sort(const Parallel::Communicator &comm_in,
00045                             std::vector<KeyType>& d) :
00046   ParallelObject(comm_in),
00047   _n_procs(cast_int<processor_id_type>(comm_in.size())),
00048   _proc_id(cast_int<processor_id_type>(comm_in.rank())),
00049   _bin_is_sorted(false),
00050   _data(d)
00051 {
00052   std::sort(_data.begin(), _data.end());
00053 
00054   // Allocate storage
00055   _local_bin_sizes.resize(_n_procs);
00056 }
00057 
00058 
00059 
00060 template <typename KeyType, typename IdxType>
00061 void Sort<KeyType,IdxType>::sort()
00062 {
00063   // Find the global data size.  The sorting
00064   // algorithms assume they have a range to
00065   // work with, so catch the degenerate cases here
00066   IdxType global_data_size = cast_int<IdxType>(_data.size());
00067 
00068   this->comm().sum (global_data_size);
00069 
00070   if (global_data_size < 2)
00071     {
00072       // the entire global range is either empty
00073       // or contains only one element
00074       _my_bin = _data;
00075 
00076       this->comm().allgather (static_cast<IdxType>(_my_bin.size()),
00077                               _local_bin_sizes);
00078     }
00079   else
00080     {
00081       if (this->n_processors() > 1)
00082         {
00083           this->binsort();
00084           this->communicate_bins();
00085         }
00086       else
00087         _my_bin = _data;
00088 
00089       this->sort_local_bin();
00090     }
00091 
00092   // Set sorted flag to true
00093   _bin_is_sorted = true;
00094 }
00095 
00096 
00097 
00098 template <typename KeyType, typename IdxType>
00099 void Sort<KeyType,IdxType>::binsort()
00100 {
00101   // Find the global min and max from all the
00102   // processors.
00103   std::vector<KeyType> global_min_max(2);
00104 
00105   // Insert the local min and max for this processor
00106   global_min_max[0] = -_data.front();
00107   global_min_max[1] =  _data.back();
00108 
00109   // Communicate to determine the global
00110   // min and max for all processors.
00111   this->comm().max(global_min_max);
00112 
00113   // Multiply the min by -1 to obtain the true min
00114   global_min_max[0] *= -1;
00115 
00116   // Bin-Sort based on the global min and max
00117   Parallel::BinSorter<KeyType> bs(this->comm(), _data);
00118   bs.binsort(_n_procs, global_min_max[1], global_min_max[0]);
00119 
00120   // Now save the local bin sizes in a vector so
00121   // we don't have to keep around the BinSorter.
00122   for (processor_id_type i=0; i<_n_procs; ++i)
00123     _local_bin_sizes[i] = bs.sizeof_bin(i);
00124 }
00125 
00126 
00127 
00128 #if defined(LIBMESH_HAVE_LIBHILBERT) && defined(LIBMESH_HAVE_MPI)
00129 // Full specialization for HilbertIndices, there is a fair amount of
00130 // code duplication here that could potentially be consolidated with the
00131 // above method
00132 template <>
00133 void Sort<Hilbert::HilbertIndices,unsigned int>::binsort()
00134 {
00135   // Find the global min and max from all the
00136   // processors.  Do this using MPI_Allreduce.
00137   Hilbert::HilbertIndices
00138     local_min,  local_max,
00139     global_min, global_max;
00140 
00141   if (_data.empty())
00142     {
00143       local_min.rack0 = local_min.rack1 = local_min.rack2 = static_cast<Hilbert::inttype>(-1);
00144       local_max.rack0 = local_max.rack1 = local_max.rack2 = 0;
00145     }
00146   else
00147     {
00148       local_min = _data.front();
00149       local_max = _data.back();
00150     }
00151 
00152   MPI_Op hilbert_max, hilbert_min;
00153 
00154   MPI_Op_create       ((MPI_User_function*)__hilbert_max_op, true, &hilbert_max);
00155   MPI_Op_create       ((MPI_User_function*)__hilbert_min_op, true, &hilbert_min);
00156 
00157   // Communicate to determine the global
00158   // min and max for all processors.
00159   MPI_Allreduce(&local_min,
00160                 &global_min,
00161                 1,
00162                 Parallel::StandardType<Hilbert::HilbertIndices>(),
00163                 hilbert_min,
00164                 this->comm().get());
00165 
00166   MPI_Allreduce(&local_max,
00167                 &global_max,
00168                 1,
00169                 Parallel::StandardType<Hilbert::HilbertIndices>(),
00170                 hilbert_max,
00171                 this->comm().get());
00172 
00173   MPI_Op_free   (&hilbert_max);
00174   MPI_Op_free   (&hilbert_min);
00175 
00176   // Bin-Sort based on the global min and max
00177   Parallel::BinSorter<Hilbert::HilbertIndices> bs(this->comm(),_data);
00178   bs.binsort(_n_procs, global_max, global_min);
00179 
00180   // Now save the local bin sizes in a vector so
00181   // we don't have to keep around the BinSorter.
00182   for (processor_id_type i=0; i<_n_procs; ++i)
00183     _local_bin_sizes[i] = bs.sizeof_bin(i);
00184 }
00185 
00186 #endif // #ifdef LIBMESH_HAVE_LIBHILBERT
00187 
00188 
00189 template <typename KeyType, typename IdxType>
00190 void Sort<KeyType,IdxType>::communicate_bins()
00191 {
00192 #ifdef LIBMESH_HAVE_MPI
00193   // Create storage for the global bin sizes.  This
00194   // is the number of keys which will be held in
00195   // each bin over all processors.
00196   std::vector<IdxType> global_bin_sizes = _local_bin_sizes;
00197 
00198   // Sum to find the total number of entries in each bin.
00199   this->comm().sum(global_bin_sizes);
00200 
00201   // Create a vector to temporarily hold the results of MPI_Gatherv
00202   // calls.  The vector dest  may be saved away to _my_bin depending on which
00203   // processor is being MPI_Gatherv'd.
00204   std::vector<KeyType> dest;
00205 
00206   IdxType local_offset = 0;
00207 
00208   for (processor_id_type i=0; i<_n_procs; ++i)
00209     {
00210       // Vector to receive the total bin size for each
00211       // processor.  Processor i's bin size will be
00212       // held in proc_bin_size[i]
00213       std::vector<IdxType> proc_bin_size;
00214 
00215       // Find the number of contributions coming from each
00216       // processor for this bin.  Note: allgather combines
00217       // the MPI_Gather and MPI_Bcast operations into one.
00218       this->comm().allgather(_local_bin_sizes[i], proc_bin_size);
00219 
00220       // Compute the offsets into my_bin for each processor's
00221       // portion of the bin.  These are basically partial sums
00222       // of the proc_bin_size vector.
00223       std::vector<IdxType> displacements(_n_procs);
00224       for (processor_id_type j=1; j<_n_procs; ++j)
00225         displacements[j] = proc_bin_size[j-1] + displacements[j-1];
00226 
00227       // Resize the destination buffer
00228       dest.resize (global_bin_sizes[i]);
00229 
00230       MPI_Gatherv((_data.size() > local_offset) ?
00231                   &_data[local_offset] :
00232                   NULL,                            // Points to the beginning of the bin to be sent
00233                   _local_bin_sizes[i],               // How much data is in the bin being sent.
00234                   Parallel::StandardType<KeyType>(), // The data type we are sorting
00235                   (dest.empty()) ?
00236                   NULL :
00237                   &dest[0],                        // Enough storage to hold all bin contributions
00238                   (int*) &proc_bin_size[0],          // How much is to be received from each processor
00239                   (int*) &displacements[0],          // Offsets into the receive buffer
00240                   Parallel::StandardType<KeyType>(), // The data type we are sorting
00241                   i,                                 // The root process (we do this once for each proc)
00242                   this->comm().get());
00243 
00244       // Copy the destination buffer if it
00245       // corresponds to the bin for this processor
00246       if (i == _proc_id)
00247         _my_bin = dest;
00248 
00249       // Increment the local offset counter
00250       local_offset += _local_bin_sizes[i];
00251     }
00252 #endif // LIBMESH_HAVE_MPI
00253 }
00254 
00255 
00256 
00257 #if defined(LIBMESH_HAVE_LIBHILBERT) && defined(LIBMESH_HAVE_MPI)
00258 // Full specialization for HilbertIndices, there is a fair amount of
00259 // code duplication here that could potentially be consolidated with the
00260 // above method
00261 template <>
00262 void Sort<Hilbert::HilbertIndices,unsigned int>::communicate_bins()
00263 {
00264   // Create storage for the global bin sizes.  This
00265   // is the number of keys which will be held in
00266   // each bin over all processors.
00267   std::vector<unsigned int> global_bin_sizes(_n_procs);
00268 
00269   libmesh_assert_equal_to (_local_bin_sizes.size(), global_bin_sizes.size());
00270 
00271   // Sum to find the total number of entries in each bin.
00272   // This is stored in global_bin_sizes.  Note, we
00273   // explicitly know that we are communicating MPI_UNSIGNED's here.
00274   MPI_Allreduce(&_local_bin_sizes[0],
00275                 &global_bin_sizes[0],
00276                 _n_procs,
00277                 MPI_UNSIGNED,
00278                 MPI_SUM,
00279                 this->comm().get());
00280 
00281   // Create a vector to temporarily hold the results of MPI_Gatherv
00282   // calls.  The vector dest  may be saved away to _my_bin depending on which
00283   // processor is being MPI_Gatherv'd.
00284   std::vector<Hilbert::HilbertIndices> dest;
00285 
00286   unsigned int local_offset = 0;
00287 
00288   for (unsigned int i=0; i<_n_procs; ++i)
00289     {
00290       // Vector to receive the total bin size for each
00291       // processor.  Processor i's bin size will be
00292       // held in proc_bin_size[i]
00293       std::vector<unsigned int> proc_bin_size(_n_procs);
00294 
00295       // Find the number of contributions coming from each
00296       // processor for this bin.  Note: Allgather combines
00297       // the MPI_Gather and MPI_Bcast operations into one.
00298       // Note: Here again we know that we are communicating
00299       // MPI_UNSIGNED's so there is no need to check the MPI_traits.
00300       MPI_Allgather(&_local_bin_sizes[i], // Source: # of entries on this proc in bin i
00301                     1,                    // Number of items to gather
00302                     MPI_UNSIGNED,
00303                     &proc_bin_size[0],    // Destination: Total # of entries in bin i
00304                     1,
00305                     MPI_UNSIGNED,
00306                     this->comm().get());
00307 
00308       // Compute the offsets into my_bin for each processor's
00309       // portion of the bin.  These are basically partial sums
00310       // of the proc_bin_size vector.
00311       std::vector<unsigned int> displacements(_n_procs);
00312       for (unsigned int j=1; j<_n_procs; ++j)
00313         displacements[j] = proc_bin_size[j-1] + displacements[j-1];
00314 
00315       // Resize the destination buffer
00316       dest.resize (global_bin_sizes[i]);
00317 
00318       MPI_Gatherv((_data.size() > local_offset) ?
00319                   &_data[local_offset] :
00320                   NULL,                   // Points to the beginning of the bin to be sent
00321                   _local_bin_sizes[i],      // How much data is in the bin being sent.
00322                   Parallel::StandardType<Hilbert::HilbertIndices>(), // The data type we are sorting
00323                   (dest.empty()) ?
00324                   NULL :
00325                   &dest[0],               // Enough storage to hold all bin contributions
00326                   (int*) &proc_bin_size[0], // How much is to be received from each processor
00327                   (int*) &displacements[0], // Offsets into the receive buffer
00328                   Parallel::StandardType<Hilbert::HilbertIndices>(), // The data type we are sorting
00329                   i,                        // The root process (we do this once for each proc)
00330                   this->comm().get());
00331 
00332       // Copy the destination buffer if it
00333       // corresponds to the bin for this processor
00334       if (i == _proc_id)
00335         _my_bin = dest;
00336 
00337       // Increment the local offset counter
00338       local_offset += _local_bin_sizes[i];
00339     }
00340 }
00341 
00342 #endif // #if defined(LIBMESH_HAVE_LIBHILBERT) && defined(LIBMESH_HAVE_MPI)
00343 
00344 
00345 
00346 template <typename KeyType, typename IdxType>
00347 void Sort<KeyType,IdxType>::sort_local_bin()
00348 {
00349   std::sort(_my_bin.begin(), _my_bin.end());
00350 }
00351 
00352 
00353 
00354 template <typename KeyType, typename IdxType>
00355 const std::vector<KeyType>& Sort<KeyType,IdxType>::bin()
00356 {
00357   if (!_bin_is_sorted)
00358     {
00359       libMesh::out << "Warning! Bin is not yet sorted!" << std::endl;
00360     }
00361 
00362   return _my_bin;
00363 }
00364 
00365 }
00366 
00367 
00368 
00369 // Explicitly instantiate for int, double
00370 template class Parallel::Sort<int, unsigned int>;
00371 template class Parallel::Sort<double, unsigned int>;
00372 #if defined(LIBMESH_HAVE_LIBHILBERT) && defined(LIBMESH_HAVE_MPI)
00373 template class Parallel::Sort<Hilbert::HilbertIndices, unsigned int>;
00374 #endif
00375 
00376 } // namespace libMesh