PaGMO  1.1.5
mpi_island.cpp
1 /*****************************************************************************
2  * Copyright (C) 2004-2015 The PaGMO development team, *
3  * Advanced Concepts Team (ACT), European Space Agency (ESA) *
4  * *
5  * https://github.com/esa/pagmo *
6  * *
7  * act@esa.int *
8  * *
9  * This program is free software; you can redistribute it and/or modify *
10  * it under the terms of the GNU General Public License as published by *
11  * the Free Software Foundation; either version 2 of the License, or *
12  * (at your option) any later version. *
13  * *
14  * This program is distributed in the hope that it will be useful, *
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
17  * GNU General Public License for more details. *
18  * *
19  * You should have received a copy of the GNU General Public License *
20  * along with this program; if not, write to the *
21  * Free Software Foundation, Inc., *
22  * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
23  *****************************************************************************/
24 
25 #include <boost/date_time/posix_time/posix_time.hpp>
26 #include <boost/scoped_ptr.hpp>
27 #include <boost/shared_ptr.hpp>
28 #include <boost/thread/condition_variable.hpp>
29 #include <boost/thread/thread.hpp>
30 #include <boost/thread/locks.hpp>
31 #include <boost/thread/mutex.hpp>
32 #include <list>
33 #include <set>
34 #include <stdexcept>
35 #include <string>
36 
37 #include "algorithm/base.h"
38 #include "base_island.h"
39 #include "exceptions.h"
40 #include "mpi_environment.h"
41 #include "mpi_island.h"
42 #include "migration/base_r_policy.h"
43 #include "migration/base_s_policy.h"
44 #include "population.h"
45 #include "problem/base.h"
46 
47 namespace pagmo
48 {
49 
50 boost::mutex mpi_island::m_proc_mutex;
51 boost::condition_variable mpi_island::m_proc_cond;
52 boost::mutex mpi_island::m_mpi_mutex;
53 std::list<mpi_island const *> mpi_island::m_queue;
54 boost::scoped_ptr<std::set<int> > mpi_island::m_available_processors;
55 
57 
61  const migration::base_s_policy &s_policy, const migration::base_r_policy &r_policy):
62  base_island(a,p,n,s_policy,r_policy)
63 {}
64 
66 
70  const migration::base_s_policy &s_policy, const migration::base_r_policy &r_policy):
71  base_island(a,pop,s_policy,r_policy)
72 {}
73 
75 
79 {}
80 
83 {
85  return *this;
86 }
87 
89 {
90  return base_island_ptr(new mpi_island(*this));
91 }
92 
93 // Method that perform the actual evolution for the island population, and is used to distribute the computation load over multiple processors
95 {
96  // Create copy of data to be transmitted - will use a std::pair for packing everything in a single object.
97  const boost::shared_ptr<population> pop_copy(new population(pop));
98  const algorithm::base_ptr algo_copy = algo.clone();
99  const std::pair<boost::shared_ptr<population>,algorithm::base_ptr> out(pop_copy,algo_copy);
100  const int processor = acquire_processor();
102  mpi_environment::send(out,processor);
103  } else {
104  // Lock down MPI mutex before sending.
105  boost::lock_guard<boost::mutex> lock(m_mpi_mutex);
106  mpi_environment::send(out,processor);
107  }
108  bool successful = false;
109  boost::shared_ptr<population> in;
110  try {
112  mpi_environment::recv(in,processor);
113  } else {
114  while (true) {
115  {
116  boost::lock_guard<boost::mutex> lock(m_mpi_mutex);
117  if (mpi_environment::iprobe(processor)) {
118  mpi_environment::recv(in,processor);
119  break;
120  }
121  }
122  boost::this_thread::sleep(boost::posix_time::milliseconds(10));
123  }
124  }
125  successful = true;
126  } catch (const boost::archive::archive_exception &e) {
127  std::cout << "MPI Recv Error during island evolution using " << algo.get_name() << ": " << e.what() << std::endl;
128  } catch (...) {
129  std::cout << "MPI Recv Error during island evolution using " << algo.get_name() << ", unknown exception caught. :(" << std::endl;
130  }
131  release_processor(processor);
132  if (successful) {
133  // NOTE: implement via population::swap (to be written) in order to avoid
134  // extra copying?
135  pop = *in;
136  }
137 }
138 
140 
143 std::string mpi_island::get_name() const
144 {
145  return "MPI island";
146 }
147 
148 void mpi_island::init_processors()
149 {
150  // Make sure we are not called with the mutex not locked.
151  pagmo_assert(!m_proc_mutex.try_lock());
152  if (!m_available_processors) {
153  m_available_processors.reset(new std::set<int>());
154  pagmo_assert(mpi_environment::get_size() >= 2);
155  // Fill in the available processors (the root processor is excluded).
156  for (int i = 1; i < mpi_environment::get_size(); ++i) {
157  m_available_processors->insert(i);
158  }
159  }
160 }
161 
162 int mpi_island::acquire_processor() const
163 {
164  // Lock down before doing anything else.
165  boost::unique_lock<boost::mutex> lock(m_proc_mutex);
166  init_processors();
167  if (m_available_processors->empty() || !m_queue.empty()) {
168  // Put self at the end of the queue.
169  m_queue.push_back(this);
170  while (*m_queue.begin() != this || m_available_processors->empty()) {
171  m_proc_cond.wait(lock);
172  }
173  // this reached the head of the queue and there are available processors: pop it
174  // from the head and proceed.
175  m_queue.pop_front();
176  }
177  pagmo_assert(!m_available_processors->empty());
178  const int retval = *m_available_processors->begin();
179  m_available_processors->erase(m_available_processors->begin());
180  return retval;
181 }
182 
183 void mpi_island::release_processor(int n) const
184 {
185  {
186  boost::lock_guard<boost::mutex> lock(m_proc_mutex);
187  init_processors();
188  if (n <= 0 || n >= mpi_environment::get_size()) {
189  pagmo_throw(std::runtime_error,"invalid processor id: the value is either non-positive or exceeding the size of the MPI world");
190  }
191  if (m_available_processors->find(n) != m_available_processors->end()) {
192  pagmo_throw(std::runtime_error,"trying to release a processor which was never acquired");
193  }
194  // Re-insert the processor in the pool of available processors.
195  m_available_processors->insert(n);
196  }
197  m_proc_cond.notify_all();
198 }
199 
200 }
201 
202 BOOST_CLASS_EXPORT_IMPLEMENT(pagmo::mpi_island)
boost::shared_ptr< base > base_ptr
Alias for shared pointer to base algorithm.
static bool is_multithread()
Thread-safety of the MPI implementation.
static void recv(T &retval, int source)
Receive MPI payload.
Root PaGMO namespace.
void perform_evolution(const algorithm::base &, population &) const
Method that implements the evolution of the population.
Definition: mpi_island.cpp:94
Base class for migration replacement policies.
Definition: base_r_policy.h:57
Base class for migration selection policies.
Definition: base_s_policy.h:54
mpi_island & operator=(const mpi_island &)
Assignment operator.
Definition: mpi_island.cpp:82
Base algorithm class.
static bool iprobe(int)
Probe for message.
mpi_island(const mpi_island &)
Copy constructor.
Definition: mpi_island.cpp:78
Base problem class.
Definition: problem/base.h:148
Population class.
Definition: population.h:70
static int get_size()
MPI world size.
virtual base_ptr clone() const =0
Clone method.
base_island & operator=(const base_island &)
Assignment operator.
virtual std::string get_name() const
Get algorithm's name.
Base island class.
Definition: base_island.h:81
std::string get_name() const
Return a string identifying the island's type.
Definition: mpi_island.cpp:143
MPI island class.
Definition: mpi_island.h:77
boost::shared_ptr< base_island > base_island_ptr
Alias for the shared pointer to a pagmo::base_island.
Definition: base_island.h:49
base_island_ptr clone() const
Clone method.
Definition: mpi_island.cpp:88
static void send(const T &payload, int destination)
Send MPI payload.