Lily
Lily

Reputation: 6022

How to synchronize and combine results from multiple threads in C++?

I have a data feed continuously feeding data packet in. There are 5 threads(A, B, C, D, E) processing the data packages. Note the 5 threads have totally different speed and they generate 5 different features(each thread generate 1 feature) for every incoming data package.

The 5 threads are at different pace: when A has finished analyzing first 10 packages, B might only have finished package 1, package 2, and C might have not even finish a single package at all.

My task is to match the results from 5 threads, and start the final analysis when all the 5 features for the first 10 data package are available.

My question is: - How to combine the results from different threads making sure the analysis thread is only triggered when a certain amount of result are available? - I seems that I need a aggregator thread checking the availability of different buffers. I am thinking in terms of lock/condition. How could I implement such a condition involving different buffers?

Totally newbie in multithreading. Any suggestion is welcomed.

I am using GNU C++ with Boost library.

Upvotes: 1

Views: 4064

Answers (7)

fa.
fa.

Reputation: 2486

With your current design your are limited by the slowest computation, the other threads won't be used enough.

If you want to process a lot of packets, I would instead split the work like this :

Distribute data packets to N identical thread which compute the 5 result in sequence for the packets they receive.

Each thread puts its result packets in a thread safe fifo.

Your main thread reads the result and if needed reorders them using packet numbers

Upvotes: 0

Chris K
Chris K

Reputation: 12359

Some pseudocode:

worker thread: 
   -> do work, 
     -> publish result to queue
   -> 10 reached, signal my condvar 

aggregator thread: 
   -> wait on all condvars. 
   -> lock all result queues, swap in new empty ones.  
   -> do aggregation processing. 

The reason for creating new queues is that your aggregation processing may result in significant amounts of locking, and invalidation if items are removed - if you put new queues on your worker threads, you need to worry less about locking (especially as the aggregator doesn't need to share it's results BACK with the workers).

Upvotes: 1

Emilio M Bumachar
Emilio M Bumachar

Reputation: 2613

Use semaphores, and extra boolean 'done' variables. Every time a thread is done, it FIRST write its answers, THEN its 'done' variable, then calls a 'check' function that checks all treads 'done' variables and if they're all true trigger the analysis thread.

depending on your performance trade-offs, you probably want just the slowest 'work' thread to ever call the 'check' function, so the fast ones won't keep locking its 'done' variable for reading. This, of course, depends on knowing which is the slowest.

I don't know your reset policy: do you want to wait for 10 fresh inputs every time or analyze the 10 most recent continuously?

Upvotes: 1

naumcho
naumcho

Reputation: 19931

You may want to check the Producer-consumer problem

Upvotes: 1

joshperry
joshperry

Reputation: 42257

Have a container that stores results and a function like this (psuedo code):

void storeResult(Result result) {
    Mutex m("ResultContainerMutex");

    container.push_back(result);
    if(container.size() > ANALYSIS_SIZE) {
        StartAnalysisThread(container.copy());
        container.clear();
    }
}

Since the mutex is only protecting the add to container operation which is fairly quick it shouldn't cause excessive serialization.

Upvotes: 0

jldupont
jldupont

Reputation: 96806

Have yourself an "aggregator" thread: this thread would get its input from the worker threads (through non-blocking thread-safe queues I suggest) and once a "batch" is ready, push it to your "analyzer" thread.

Queues offer the advantage of not blocking any of the workers: the "aggregator" just has to poll the worker queues (through a condition section). You can control the rate of polling to your liking.

This solution goes around the problem of "synchronize all" situations.

Upvotes: 3

Paul Nathan
Paul Nathan

Reputation: 40319

Barriers are the canonical "synchronize all" operation.

However, it sounds like you want to have a "count result" variable in a critical section that is incremented when a certain amount is done. Then, you want to do a "block until variable is equal to x". That can be accomplished with a spin-lock against the count result variable.

Upvotes: 0

Related Questions