Reputation: 2321
I'm trying to implement a call-by-future mechanism in C++. Although this is just a test code (made in a bit of a hurry), I intend to use something similar for the runtime of a language I'm working on for transparent parallelism.
I've dried the code I'm working on to make it a little bit smaller, though it is still big:
#include <cstdlib>
#include <cstdio>
#include <iostream>
#include <vector>
#include <queue>
#include <future>
#include <thread>
#include <functional>
#include <type_traits>
#include <utility>
using namespace std;
using namespace std::chrono;
//------------------------------------------------------------------------------
// Simple locked printer
static std::recursive_mutex print_lock;
inline void print_() {
return;
};
template<typename T, typename... Args>
inline void print_(T t, Args... args) {
print_lock.lock();
std::cout << t;
print_(args...);
print_lock.unlock();
};
//------------------------------------------------------------------------------
template<typename R>
class PooledTask {
public:
explicit PooledTask(function<R()>);
// Possibly execute the task and return the value
R &operator () () {
// If we can get the lock, we're not executing
if(lock.try_lock()) {
// We may already have executed it
if(done)
goto end;
// Otherwise, execute it now
try {
result = move(task());
} catch(...) {
// If an exception is thrown, save it for later
eptr = current_exception();
failed = true;
};
done = true;
goto end;
} else {
// Wait until the task is completed
lock.lock();
end: {
lock.unlock();
// Maybe we got an exception!
if(failed)
rethrow_exception(eptr);
// Otherwise, just return the result
return result;
};
};
};
private:
exception_ptr eptr;
function<R()> task;
bool done;
bool failed;
mutex lock;
R result;
};
extern class TaskPool pool;
class TaskPool {
public:
TaskPool() noexcept: TaskPool(thread::hardware_concurrency() - 1) {
return;
};
TaskPool(const TaskPool &) = delete;
TaskPool(TaskPool &&) = delete;
template<typename T>
void push(PooledTask<T> *task) noexcept {
lock_guard<mutex> guard(lock);
builders.push([=] {
try {
(*task)();
} catch(...) {
// Ignore it here! The task will save it. :)
};
});
};
~TaskPool() {
// TODO: wait for all tasks to finish...
};
private:
queue<thread *> threads;
queue<function<void()>> builders;
mutex lock;
TaskPool(signed N) noexcept {
while(N --> 0)
threads.push(new thread([this, N] {
for(;;) {
pop_task();
};
}));
};
void pop_task() noexcept {
lock.lock();
if(builders.size()) {
auto task = builders.front();
builders.pop();
lock.unlock();
task();
} else
lock.unlock();
};
} pool;
template<typename R>
PooledTask<R>::PooledTask(function<R()> fun):
task(fun),
done(false),
failed(false)
{
pool.push(this);
};
// Should probably return a std::shared_ptr here...
template<typename F, typename... Args>
auto byfuture(F fun, Args&&... args) noexcept ->
PooledTask<decltype(fun(args...))> *
{
using R = decltype(fun(args...));
auto pooled = new PooledTask<R> {
bind(fun, forward<Args>(args)...)
};
return pooled;
};
//------------------------------------------------------------------------------
#include <map>
// Get the current thread id as a simple number
static int myid() noexcept {
static unsigned N = 0;
static map<thread::id, unsigned> hash;
static mutex lock;
lock_guard<mutex> guard(lock);
auto current = this_thread::get_id();
if(!hash[current])
hash[current] = ++N;
return hash[current];
};
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
// The fibonacci test implementation
int future_fib(int x, int parent) {
if(x < 3)
return 1;
print_("future_fib(", x, ")", " on thread ", myid(), \
", asked by thread ", parent, "\n");
auto f1 = byfuture(future_fib, x - 1, myid());
auto f2 = byfuture(future_fib, x - 2, myid());
auto res = (*f1)() + (*f2)();
delete f1;
delete f2;
return res;
};
//------------------------------------------------------------------------------
int main() {
// Force main thread to get id 1
myid();
// Get task
auto f = byfuture(future_fib, 8, myid());
// Make sure it starts on the task pool
this_thread::sleep_for(seconds(1));
// Blocks
(*f)();
// Simply wait to be sure all threads are clean
this_thread::sleep_for(seconds(2));
//
return EXIT_SUCCESS;
};
The result for this program is something like this (I've got a quadcore, so 3 threads in the pool):
future_fib(8) on thread 2, asked by thread 1
future_fib(7) on thread 3, asked by thread 2
future_fib(6) on thread 4, asked by thread 2
future_fib(6) on thread 3, asked by thread 3
future_fib(5) on thread 4, asked by thread 4
future_fib(5) on thread 3, asked by thread 3
future_fib(4) on thread 4, asked by thread 4
future_fib(4) on thread 3, asked by thread 3
future_fib(3) on thread 4, asked by thread 4
future_fib(3) on thread 3, asked by thread 3
future_fib(3) on thread 4, asked by thread 4
future_fib(3) on thread 3, asked by thread 3
future_fib(4) on thread 4, asked by thread 4
future_fib(4) on thread 3, asked by thread 3
future_fib(3) on thread 4, asked by thread 4
future_fib(3) on thread 3, asked by thread 3
future_fib(5) on thread 3, asked by thread 3
future_fib(4) on thread 3, asked by thread 3
future_fib(3) on thread 3, asked by thread 3
future_fib(3) on thread 3, asked by thread 3
This implementation got really slow compared to a normal fibonacci function.
So the problem here: when the pool runs fib(8)
, it will create two tasks that will run on the next threads, but, when it reaches auto res = (*f1)() + (*f2)();
, both tasks are already running, so it will block on f1
(running on thread 3).
What I need to do to get speed improvement would be for thread 2, instead of blocking on f1
, to assume the execution of whatever thread 3 is doing, leaving it ready to take another task, so no thread will be sleeping doing the calculation.
This article here http://bartoszmilewski.com/2011/10/10/async-tasks-in-c11-not-quite-there-yet/ says it is necessary to do what I want, but doesn't specify how.
My doubt is: how could I possibly do that?
Are there other alternatives to do what I want?
Upvotes: 5
Views: 1652
Reputation: 16980
I think you might have a chance with the resumable functions currently proposed for C++ standartization. The proposal is not approved yet, but Visual Studio 15 CTP implements the proposal, so you can try making a prototype (if you can use MSVC compiler).
Gor Nishanov (one of the authors of the latest proposal paper) describes a very similar example of calculating Fibonacci with "parent-stealing scheduling" starting at 23:47 in his CppCon talk: https://www.youtube.com/watch?v=KUhSjfSbINE
Note, however, that I couldn't find any sources/samples of the implementation of the spawnable<T>
, so you may need to contact the proposal authors for details.
Upvotes: 1
Reputation: 2528
Look your code is full of things that take longer than calculating fib 8.
For example switching into the kernel space to find out what the thread id is will probable on most flavors of windows take longer than the work being done here.
Parallelization is not about having a bunch of threads competing for shared memory. This is the worst mistake you could make.
When you parallelize a task you break the outputs up into discrete chunks so that the parallel threads are each writing to their own memory, and you avoid memory and cache contention which is tanking your app.
When you have 3 thread touching 3 separate memory locations then there is NEVER a need to use a Lock or some other synchronization primitive. Which on most flavors of windows also requires a kernel mode switch.
So the only thing you really need to know is when the threads are all done. This can be achieved via many Interlocked Exchange methods or OS driven event handles.
If you want to be a serious developer, then remove the Thread ID, remove the Lock code, and start thinking about how you can approach this problem without them.
Think about 2 cars on a 2 lane freeway. one is moving faster than the other. And you never know which car is ahead of the other. Ask yourself is there some way of positioning those cars in the 2 lanes where it doesn't matter who is in front and who is moving faster? And you should conclude that if each car stays in its own lane then there will never be a problem. This is parallelization in its simplest form.
Now consider that your going to spawn these jobs off on separate machines, on different continents. Is it reasonable to try and exchange information about threads and memory? No its not. You very simply break the problem into discrete functional chunks that have ABSOLUTLY NOTHING to do with one another, forget about the excessive control, and let the magic that is the information age happen.
I hope this helps.
Upvotes: 0