Reputation: 1825
I have a bunch of threads doing work on multiple data items. The threads have to put out the results in the same order I hand data to the threads. That is:
Thread #1: give data - start processing
Thread #2: give data - start processing
Thread #3: give data - start processing
...
Thread #n: give data - start processing
The results should be retrieved in the same order the data was passed to the threads regardless of which thread finished processing first. Namely:
Thread #1: put data
Thread #2: put data
...
To differentiate between threads and manage them, I gave each one an ID (0,1,2,...,n)
. I am using the IDs to assign data to each thread so it can process it.
for(int i=0; i<thread_count; i++)
give_data(i); // i is id and the function knows where to get data from
I want the threads to share a token which determines which thread is expected to produce the result. All thread bodies are identical, the body looks like:
while(true){
auto data = get_data();
result = process_data(data);
while(token != this_id) spin;
put_data(result); // this is a synchronized call
update_token(token);
}
My issue comes with the token
. I first tried a normal reference (int & token
) and it obviously cannot work (and I didn't expect it to). Anyway, I used a static variable and the threads do not always get the latest one. I was surprised to see one thread dominating everything. Whenever a thread updates the token, it loses its turn allowing another thread to put its result and so on. However, I had one thread dominating as if the token is always set to its own ID and not updating.
If I had to guess I would say it's a caching issue. However, I am not sure.
Anyway, I am thinking of using std::atomic<int>
as my token. Would it work? If not, what else should I consider doing? What would be a better way to synchronize those threads?
Extra: this feels like a bad design and I am not sure how to do it better. Any suggestions would be very much appreciated.
Upvotes: 3
Views: 1447
Reputation: 1825
Max's answer is great. If I had the ability to use OpenMP given the time I would have done it. However, I am not which is why I am posting this answer to my question.
In my previous design it depended on the threads to synchronize with each other and that doesn't seem like the best idea as so much can go wrong. Instead, I decided to let the manager synchronize their results (I got the idea from Max's last code snippet).
void give_threads_data(){
vector<pair<data, promise<result>*> promises(threads.size());
vector<future<result>> futures(threads.size());
for(int i=0; i<threads.size(); i++){
data d = get_data();
threads[i].put_data(d, promises[i]);
futures[i] = promises[i].get_future();
}
for(int i=0; i<futures.size(); i++){
result = futures[i].get();
// handle result
}
}
That way I was able to get the results the same way I send them to the threads. The thread body became a lot cleaner:
void thread_body(){
while(true){
pair<data, promise<result>*> item = queue.get(); // blocking call
data d = item.first;
promise<result>* promise = item.second;
result r = process_data(d);
promise->set_value(r);
}
}
There is no play and the results are perfect. Next time I'm doing threading I will consider OpenMP.
Upvotes: 3
Reputation: 23691
Anyway, I used a static variable and the threads do not always get the latest one. I was surprised to see one thread dominating everything
Yes, multiple threads accessing the same unsynchronized value with at least one of them writing to it is a data race, which is undefined behavior according to the C++ standard. Anything could happen.
I am thinking of using std::atomic as my token. Would it work?
Yes. This would prevent any data race on the token. I don't see any other direct problem in your pseudocode, so from this perspective it looks good.
this feels like a bad design and I am not sure how to do it better. Any suggestions would be very much appreciated.
The whole design does look somewhat weird, but it depends on your threading library if there is an easier way to express this. For example, with OpenMP you could do this for one pass (the logic behind give_data
and get_data
are too unclear to make this full):
#pragma omp parallel
{
int threadCount = omp_get_num_threads();
#pragma omp single
for (int i = 0; i < threadCount; ++i)
give_data(i);
#pragma omp ordered for ordered schedule(static)
for (int i = 0; i < threadCount; ++i)
{
auto data = get_data();
result = process_data(data);
#pragma omp ordered
put_data(result); // this is a synchronized call
}
}
The ordered
directive forces the put_data
calls to be executed exactly in the same order (one-by-one) as if the loop was serial, while the threads can still do the prior data processing in parallel.
Things might actually be even easier with OpenMP if all you really wanted to do was make one big loop of data processing parallel with ordered writes:
#pragma omp parallel for ordered schedule(static)
for (int i = 0; i < dataItemCount; ++i)
{
auto data = get_data(i); // whatever this would entail
auto result = process_data(data);
#pragma omp ordered
put_data(result); // this is a synchronized call
}
It doesn't look like you require the data item distribution to be in order, but if you actually do then this approach would not work as easily because you can only have one ordered section per ordered loop.
Upvotes: 8