Glory to Russia
Glory to Russia

Reputation: 18712

Data types for a concurrent method design

Imagine I have a collection input, which contains approx. 100000 objects.

There is a thread pool of workers, each of which

  1. takes an element of that collection,
  2. does some calculations and
  3. sometimes (in approx. 10 to max. 50 % of cases, i. e. between 10000 and 50000 times per run) adds the results of them to the output collection.

After all items in input have been processed, another routine takes output and does something with it.

I need the thread pool in order to process input as fast as possible. Every element of input should be processed exactly one time.

The order, in which the elements are processed, is irrelevant (both for input and output). output is write-only - the workers will only write there and won't do any other operations on output.

There are two parts of the problem, where thread safety is important:

  1. The worker threads need to make sure that when worker thread A processes some element of input, other workers notice it and don't process the same element.
  2. After worker finished processing of an element, the results should be added to the output collection.

Questions:

  1. Which collection type can I safely use for input collection ( ConcurrentLinkedQueue? ) ?
  2. Can I use normal LinkedList for the output collection (if 2 threads try to add different objects to the list at the same time, can it occur that one of the objects won't be saved) ?

Upvotes: 3

Views: 225

Answers (2)

Stuart Marks
Stuart Marks

Reputation: 132380

You didn't say anything about Java 8, but this is a classic application for the new Java 8 parallel streams library:

Collection<Item> input = ... ;
List<Result> output = input.parallelStream()
    .map(Item::computeResult)
    .filter(Result::matches)
    .collect(Collectors.toList());

The resulting list is an ArrayList (though this may change in the future). This works even though multiple threads are doing the processing, yet ArrayList isn't thread-safe. How does this work?

The reason it works is that each thread inserts results into its own instance of a list containing intermediate results. At the end, the intermediate results are combined into a single output list. This avoids potential contention that might occur if several threads are all writing results to the output list simultaneously.

The principle in use here is thread-confinement (Goetz, sec 3.3). It is safe to use a non-thread-safe data structure in a multi-threaded environment, as long as only one thread at a time has access to it, and the data is handed off safely between threads.

If you're not on Java 8, you can use some of the same techniques for multi-threading by using the fork-join framework (see Lea) that was introduced in Java 7. It's not as convenient as Java 8 Streams. (In fact, Java 8 Streams are built on top of the Fork/Join framework.) Of course you'll have to do more work and you won't get the convenience of lambdas. But it does provide a reasonably convenient way to structure easily splittable computations.

The key is to structure your computation so that chunks can be represented as a RecursiveTask. Typically a recursive task contains a reference to the input data structure, a range of array or list indexes, and a place to store intermediate results. A task can easily be split ("forked") by splitting the range of indexes. After each forked task is joined, its intermediate results can be combined with this task's intermediate results. This is done in a thread-confined manner (the join operation handles proper handoff between threads). In addition, the combining phase also occurs in parallel, since different threads combining results from different parts of the computation tree can all proceed in parallel.

REFERENCES

Goetz, et. al. Java Concurrency In Practice. Copyright 2006 Pearson Education

Lea, Doug. A Java Fork/Join Framework. Proceedings of the ACM 2000 Conference on Java Grande. http://gee.cs.oswego.edu/dl/papers/fj.pdf

Upvotes: 2

Diego Martinoia
Diego Martinoia

Reputation: 4662

CLQ is fine for the input given your constraints, just be careful when polling size() for checking the termination of the input: as mentioned in the doc, it's not a constant time operation.

For the output, I doubt LinkedList is thread-safe, even just for adding. Adding means altering the state of the head node, and if two thread add at the same time this may create issues and detached elements.

You can use another CLQ or a LinkedBlockingDeque . There is also a simpler SynchronizedLinkedList.

Upvotes: 3

Related Questions