boost:asio thread pool implementation for occasionally synchronized tasks boost:asio thread pool implementation for occasionally synchronized tasks multithreading multithreading

boost:asio thread pool implementation for occasionally synchronized tasks


You may use futures for data processing and synchronize with them using boost::wait_for_all(). This will allow you to operate in terms of parts of work done, not threads.

int process_data() {...}// Pending futuresstd::vector<boost::unique_future<int>> pending_data;for(int i = 0; i < numSmallTasks; ++i){   // Create task and corresponding future   // Using shared ptr and binding operator() trick because   // packaged_task is non-copyable, but asio::io_service::post requires argument to be copyable   // Boost 1.51 syntax   // For Boost 1.53+ or C++11 std::packaged_task shall be boost::packaged_task<int()>   typedef boost::packaged_task<int> task_t;   boost::shared_ptr<task_t> task = boost::make_shared<task_t>(      boost::bind(&process_data, i, theTime));   boost::unique_future<int> fut = task->get_future();   pending_data.push_back(std::move(fut));   io_service.post(boost::bind(&task_t::operator(), task));    }// After loop - wait until all futures are evaluatedboost::wait_for_all(pending_data.begin(), pending_data.end()); 


may be you can use boost::barrier as follow:

void thread_proc( boost::barrier& b ) {    while( true ) {        if( !ioservice.run_one() ) break; // io_service stopped        b.wait();    }}


Rost's method essentially works, but the boost::make_shared can not compile as is. The following is a working version (in vs2012):

#include <boost/asio.hpp>#include <boost/bind.hpp>#include <boost/make_shared.hpp>#include <boost/function_types/result_type.hpp>#include <boost/shared_ptr.hpp>#include <boost/function.hpp>#include <boost/thread.hpp>std::vector<boost::unique_future<void>> pending_data;typedef boost::packaged_task<void> task_t;boost::shared_ptr< boost::packaged_task<void> > pt(new boost::packaged_task<void> ([&,i](){...}));boost::unique_future<void> result = pt->get_future();pending_data.push_back(boost::move(result));io_service.post(boost::bind(&task_t::operator(), pt));boost::wait_for_all(pending_data.begin(), pending_data.end()); pending_data.clear();

It will not compile if use argument in the packaged_task typedef. This thread pool by asio and future method only saved 8% time compared with each loop create new thread methods.