Reputation: 6359
I've got an array of chunks
that first need to be processed
and then combined
. chunks
can be processed in arbitrary order but need to be combined in the order they appear in the array.
The following pseudo-code shows my first approach:
array chunks;
def worker_process(chunks):
while not all_chunks_processed:
// get the next chunk that isn't processed or in processing yet
get_next_chunk()
process_chunk()
def worker_combine(chunks):
for i=0 to num_chunks:
// wait until chunk i is processed
wait_until(chunks[i].is_processed())
combine_chunk(i)
def main():
array chunks;
// execute worker_process in n threads where n is the number of (logical) cores
start_n_threads(worker_process, chunks)
// wait until all processing threads are finished
join_threads()
// combine chunks in a single thread
start_1_thread(worker_combine, chunks)
// wait until combine thread is finished
join_threads()
Measurements of the above algorithm show that processing all chunks in parallel and combining the processed chunks sequentially both takes about 3 seconds which leads to a total runtime of roughly 6 seconds.
I'm using a 24 core CPU which means that during the combine stage, only one of those cores is used. My idea then was to use a pipeline to reduce the execution time. My expectation is that the execution time using a pipeline should be somewhere between 3 and 4 seconds. This is how the main function changes when using a pipeline:
def main():
array chunks;
// execute worker_process in n threads where n is the number of (logical) cores
start_n_threads(worker_process, chunks)
// combine chunks in a single thread
start_1_thread(worker_combine, chunks)
// wait until ALL threads are finished
join_threads()
This change decreases the runtime slightly, but by far not as much as I would have expected. I found out that all processing threads were finished after 3 to 4 seconds and the combine thread needed about 2 seconds more.
The problem is that the all threads are treated equally by the scheduler which leads to the combine thread being paused.
Now the question:
How can I change the pipeline so that the combine thread executes faster while still taking advantage of all cores?
I already tried reducing the number of processing threads which helped a bit, but this leads to some cores not being used at all which isn't good either.
EDIT:
While this question wasn't language-specific until here, I actually need to implement in c++ 14.
Upvotes: 2
Views: 953
Reputation: 21
One limitation of the approach the OP asked about is the wait on all threads. You need to pipeline the passing of finished jobs from the workers to the one combining as soon as they are ready to make maximum use of all cores, unless the combining operation really takes very little time compared to the actual computation in the workers (as in almost zero in comparison).
Using a simple threading framework like TBB or OpenMP would enable parallelization of the workers, but the reduce phase tuning will be critical (the chunk joining). If each join takes awhile, doing that at a course granularity will be needed. In OpenMP you could do something like:
join_arr;
#pragma omp parallel
{
double local_result;
#pragma omp for
for (i=0; i<N; i++) {
do work()
#pragma omp critical
join()
} // end of for loop
}
A more explicit and simpler way to do it would be to use something like RaftLib (http://raftlib.io, full disclosure, I'm one of the maintainers..but this is what I designed it for):
int main()
{
arr some_arr;
using foreach = raft::for_each< type_t >;
foreach fe( some_arr, arr_size, thread_count );
raft::map m;
/**
* send data, zero copy, from fe to join as soon as items are
* ready to be joined, so everything is done fully in parallel
* where fe is duplicated on fibers/threads up to thread_count
* wide and the join is on a separate fiber/thread and gathering
*/
m += fe >> join;
m.exe();
}
Upvotes: 1
Reputation: 45659
You could make your worker threads less specialized. So each time a worker thread is free, it could look for work to do; if a chunk is processed but not combined and no thread is currently combining, then the thread can run the combine for that chunk. Otherwise it can check the unprocessed queue for the next chunk to process.
UPDATE
First, I've thought a little more about why this might (or might not) help; and second, form the comments it's clear that some additional clarification is required.
But before I get into it - have you actually tried this approach to see if it helps? Because the fact is, reasoning about parallel processing it hard, which is why frameworks for parallel processing do everything they can to make simplifying assumptions. So if you want to know if it helps, try doing it and let the results direct the conversation. In truth neither of us can say for sure if it's going to help.
So, what this approach gives you is a more fluid acceptance of work onto the cores. Instead of having one worker who, if work is available when his turn comes up, will do that work but won't do anything else, and X (say 24) workers who will never do that one task even if it's ready to do, you have a pool of workers doing what needs done.
Now it's a simple reality that at any time when one core is being used to combine, one less core will be available for processing than would otherwise. And the total aggregate processor time that will be spent on each kind of work is constant. So those aren't variables to optimize. What we'd like is for the allocation of resources at any time to approximate the ratio of total work to be done.
Now to analyze in any detail we'd need to know whether each task (processing task, combining task) is processor-bound or not (and a million follow-up questions depending on the answer). I don't know that, so the following is only generally accurate...
Your initial data suggests that you spent 3 seconds of single-processing time on combining; lets just call that 3 units of work.
You spent 3 seconds of parallel processing time across 24 cores to do the processing task. Let's swag that out as 72 units of work.
So we'd guess that having roughly 1/25 of your resources on combining wouldn't be bad, but serialization constraints may keep you from realizing that ratio. (And again, if some other resource than processor is the real bottleneck in one or both tasks, then that ratio could be completely wrong anyway.)
Your pipeline approach should get close to that if you could ensure 100% utilization without the combiner thread ever falling asleep. But it can fall asleep, either because work isn't ready for it or because it loses the scheduler lottery some of the time. You can't do much about the former, and maybe the question is can you fix the latter...?
There may be architecture-specific games you can play with thread priority or affinity, but you specified portability and I would at best expect you to have to re-tune parameters to each specific hardware if you play those games. Again, my question is can we get by with something simpler?
The intuition behind my suggestion is simply this: Run enough workers to keep the system busy, and let them do whatever work is ready to do.
The limitation of this approach is that if a worker is put to sleep while it is doing combine work, then the combine pipeline is stalled. But you can't help that unless you can inoculate "thread that's doing combine work" - be that a specialized thread or a worker that happens to be doing a combine unit of work - against being put aside to let other threads run. Again, I'm not saying there's no way - but I'm saying there's no portable way, especially that would run optimally without machine-specific tuning.
And even if you are on a system where you can just outright assign one core to your combiner thread, that still might not be optimal because (a) combining work can still only be done as processing tasks finish, and (b) any time combining work isn't ready that reserved core would sit idle.
Anyway, my guess is that cases where a generic worker gets put to sleep when he happened to be combining would be less frequent then the cases where a dedicated combiner thread is unable to move forward. That's what would drive gains with this approach.
Sometimes it's better to let the incoming workload determine your task allocations, than to try to anticipate and outmaneuver the incoming workload.
Upvotes: 3