Reputation: 19302
I'm working on a C++ project that needs to run many jobs in a threadpool. The jobs are failure-prone, which means that I need to know how each job terminated after it completes. Being a Java programmer for the most part, I like the idea of using "futures" or a similar paradigm, akin to the various classes in Java's util.concurrent package.
I have two questions: first, does something like this already exist for C++ (I haven't found anything in Boost, but maybe I'm not looking hard enough); and second, is this even a sane idea for C++?
I found a brief example of what I'm trying to accomplish here:
http://www.boostcookbook.com/Recipe:/1234841
Does this approach make sense?
Upvotes: 3
Views: 2451
Reputation: 51253
Boost has futures and other threading tools implemented.
Note that when you call the get()
method on a boost::unique_future
it will re-throw any exception that might have been stored inside it during asynchronous execution.
I would suggest you do something like:
#pragma once
#include <tbb/concurrent_queue.h>
#include <boost/thread.hpp>
#include <boost/noncopyable.hpp>
#include <functional>
namespace internal
{
template<typename T>
struct move_on_copy
{
move_on_copy(const move_on_copy<T>& other) : value(std::move(other.value)){}
move_on_copy(T&& value) : value(std::move(value)){}
mutable T value;
};
template<typename T>
move_on_copy<T> make_move_on_copy(T&& value)
{
return move_on_copy<T>(std::move(value));
}
}
class executor : boost::noncopyable
{
boost::thread thread_;
tbb::concurrent_bounded_queue<std::function<void()>> execution_queue_;
template<typename Func>
auto create_task(Func&& func) -> boost::packaged_task<decltype(func())> // noexcept
{
typedef boost::packaged_task<decltype(func())> task_type;
auto task = task_type(std::forward<Func>(func));
task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.
{
try
{
if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.
my_task();
}
catch(boost::task_already_started&){}
}));
return std::move(task);
}
public:
explicit executor() // noexcept
{
thread_ = boost::thread([this]{run();});
}
~executor() // noexcept
{
execution_queue_.push(nullptr); // Wake the execution thread.
thread_.join();
}
template<typename Func>
auto begin_invoke(Func&& func) -> boost::unique_future<decltype(func())> // noexcept
{
// Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics.
auto task_adaptor = internal::make_move_on_copy(create_task(func));
auto future = task_adaptor.value.get_future();
execution_queue_.push([=]
{
try{task_adaptor.value();}
catch(boost::task_already_started&){}
});
return std::move(future);
}
template<typename Func>
auto invoke(Func&& func) -> decltype(func()) // noexcept
{
if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.
return func();
return begin_invoke(std::forward<Func>(func), prioriy).get();
}
private:
void run() // noexcept
{
while(true)
{
std::function<void()> func;
execution_queue_.pop(func);
if(!func)
break;
func();
}
}
};
Upvotes: 5
Reputation: 208353
Futures are both present in the upcoming standard (C++0x) and inside boost. Note that while the main name future
is the same, you will need to read into the documentation to locate other types and to understand the semantics. I don't know Java futures, so I cannot tell you where they differ, if they do.
The library in boost was written by Anthony Williams, that I believe was also involved in the definition of that part of the standard. He has also written C++ Concurrency in Action, that includes a good description of futures, tasks, promises and related objects. His company also sells a complete and up to implementation of the C++0x threading libraries, if you are interested.
Upvotes: 5
Reputation: 36154
C++ templates are less restrictive than Java Generics so 'Future's could readily be ported with them and thread synchronization primitives. As for existing libraries which support such a mechanism, hopefully someone else knows of one.
Upvotes: -1