A simple thread pool to run parallel PROSail simulations

In the otb-bv we use the OTB versions of the Prospect and Sail models to perform satellite reflectance simulations of vegetation.

The code for the simulation of a single sample uses the ProSail simulator configured with the satellite relative spectral responses, the acquisition parameters (angles) and the biophysical variables (leaf pigments, LAI, etc.):

ProSailType prosail;
prosail.SetRSR(satRSR);
prosail.SetParameters(prosailPars);
prosail.SetBVs(bio_vars);
auto result = prosail();

A simulation is computationally expensive and it would be difficult to parallelize the code. However, if many simulations are going to be run, it is worth using all the available CPU cores in the machine.

I show below how using C++11 standard support for threads allows to easily run many simulations in parallel.

Each simulation uses a set of variables given in an input file. We parse the sample file in order to get the input parameters for each sample and we construct a vector of simulations with the appropriate size to store the results.

otbAppLogINFO("Processing simulations ..." << std::endl);
auto bv_vec = parse_bv_sample_file(m_SampleFile);
auto sampleCount = bv_vec.size();
otbAppLogINFO("" << sampleCount << " samples read."<< std::endl);
std::vector<SimulationType> simus{sampleCount};

The simulation function is actually a lambda which will sequentially process a sequence of samples and store the results into the simus vector. We capture by reference the parameters which are the same for all simulations (the satellite relative spectral responses satRSR and the acquisition angles in prosailPars):

auto simulator = [&](std::vector<BVType>::const_iterator sample_first,
                     std::vector<BVType>::const_iterator sample_last,
                     std::vector<SimulationType>::iterator simu_first){
  ProSailType prosail;
  prosail.SetRSR(satRSR);
  prosail.SetParameters(prosailPars);
  while(sample_first != sample_last)
    {
    prosail.SetBVs(*sample_first);
    *simu_first = prosail();
    ++sample_first;
    ++simu_first;
    }
};

We start by figuring out how to split the simulation into concurrent threads. How many cores are there?

auto num_threads = std::thread::hardware_concurrency();
otbAppLogINFO("" << num_threads << " CPUs available."<< std::endl);

So we define the size of the chunks we are going to run in parallel and we prepare a vector to store the threads:

auto block_size = sampleCount/num_threads;
if(num_threads>=sampleCount) block_size = sampleCount;
std::vector<std::thread> threads(num_threads);

Here, I choose to use as many threads as cores available, but this could be changed by a multiplicative factor if we know, for instance that disk I/O will introduce some idle time for each thread.

An now we can fill the vector with the threads that will process every block of simulations :

auto input_start = std::begin(bv_vec);
auto output_start = std::begin(simus);
for(size_t t=0; t<num_threads; ++t)
  {
  auto input_end = input_start;
  std::advance(input_end, block_size);
  threads[t] = std::thread(simulator,
                           input_start,
                           input_end,
                           output_start);
  input_start = input_end;
  std::advance(output_start, block_size);
  }

The std::thread takes the name of the function object to be called, followed by the arguments of this function, which in our case are the iterators to the beginning and the end of the block of samples to be processed and the iterator of the output vector where the results will be stored. We use std::advance to update the iterator positions.

After that, we have a vector of threads which have been started concurrently. In order to make sure that they have finished before trying to write the results to disk, we call join on each thread, which results in waiting for each thread to end:

std::for_each(threads.begin(),threads.end(),
              std::mem_fn(&std::thread::join));
otbAppLogINFO("" << sampleCount << " samples processed."<< std::endl);
for(const auto& s : simus)
  this->WriteSimulation(s);

This may no be the most efficient solution, nor the most general one. Using std::async and std::future would have allowed not to have to deal with block sizes, but in this solution we can easily decide the number of parallel threads that we want to use, which may be useful in a server shared with other users.