Reputation: 18712
Imagine I have a collection input
, which contains approx. 100000 objects.
There is a thread pool of workers, each of which
output
collection.After all items in input
have been processed, another routine takes output
and does something with it.
I need the thread pool in order to process input
as fast as possible. Every element of input
should be processed exactly one time.
The order, in which the elements are processed, is irrelevant (both for input
and output
). output
is write-only - the workers will only write there and won't do any other operations on output
.
There are two parts of the problem, where thread safety is important:
input
, other workers notice it and don't process the same element.output
collection.Questions:
input
collection ( ConcurrentLinkedQueue? ) ?LinkedList
for the output
collection (if 2 threads try to add different objects to the list at the same time, can it occur that one of the objects won't be saved) ?Upvotes: 3
Views: 225
Reputation: 132380
You didn't say anything about Java 8, but this is a classic application for the new Java 8 parallel streams library:
Collection<Item> input = ... ;
List<Result> output = input.parallelStream()
.map(Item::computeResult)
.filter(Result::matches)
.collect(Collectors.toList());
The resulting list is an ArrayList
(though this may change in the future). This works even though multiple threads are doing the processing, yet ArrayList
isn't thread-safe. How does this work?
The reason it works is that each thread inserts results into its own instance of a list containing intermediate results. At the end, the intermediate results are combined into a single output list. This avoids potential contention that might occur if several threads are all writing results to the output list simultaneously.
The principle in use here is thread-confinement (Goetz, sec 3.3). It is safe to use a non-thread-safe data structure in a multi-threaded environment, as long as only one thread at a time has access to it, and the data is handed off safely between threads.
If you're not on Java 8, you can use some of the same techniques for multi-threading by using the fork-join framework (see Lea) that was introduced in Java 7. It's not as convenient as Java 8 Streams. (In fact, Java 8 Streams are built on top of the Fork/Join framework.) Of course you'll have to do more work and you won't get the convenience of lambdas. But it does provide a reasonably convenient way to structure easily splittable computations.
The key is to structure your computation so that chunks can be represented as a RecursiveTask
. Typically a recursive task contains a reference to the input data structure, a range of array or list indexes, and a place to store intermediate results. A task can easily be split ("forked") by splitting the range of indexes. After each forked task is joined, its intermediate results can be combined with this task's intermediate results. This is done in a thread-confined manner (the join operation handles proper handoff between threads). In addition, the combining phase also occurs in parallel, since different threads combining results from different parts of the computation tree can all proceed in parallel.
REFERENCES
Goetz, et. al. Java Concurrency In Practice. Copyright 2006 Pearson Education
Lea, Doug. A Java Fork/Join Framework. Proceedings of the ACM 2000 Conference on Java Grande. http://gee.cs.oswego.edu/dl/papers/fj.pdf
Upvotes: 2
Reputation: 4662
CLQ is fine for the input given your constraints, just be careful when polling size() for checking the termination of the input: as mentioned in the doc, it's not a constant time operation.
For the output, I doubt LinkedList is thread-safe, even just for adding. Adding means altering the state of the head node, and if two thread add at the same time this may create issues and detached elements.
You can use another CLQ or a LinkedBlockingDeque . There is also a simpler SynchronizedLinkedList.
Upvotes: 3