Maarten Bodewes
Maarten Bodewes

Reputation: 94038

Java: ordering results retrieved from asynchronous tasks

I've got a computation (CTR encryption) that requires results in a precise order.

For this I created a multithreaded design that calculates said results, in this case the result is a ByteBuffer. The calculation itself of course runs asynchronous, so the results may become available at any time and in any order. The "user" is a single-threaded application that uses the results by calling a method, after which the ByteBuffers are returned to the pool of resources by said method - the management of resources is already handled (using a thread safe stack).

Now the question: I need something that aggregates the results and makes them available in the right order. If the next result is not available, the method that the user called should block until it is. Does anyone know a good strategy or class in java.util.concurrent that can return asynchronously calculated results in order?

The solution it must be thread safe. I would like to avoid third party libraries, Thread.sleep() / Thread.wait() and theading related keywords other than "synchronized". Futhermore, The tasks may be given to e.g. an Executor in the correct order if that is required. This is for research, so feel free to use Java 1.6 or even 1.7 constructs.

Note: I've tagged these quesions [jre] as I want to keep within the classes defined in the JRE and [encryption] as somebody may already have had to deal with it, but the question itself is purely about java & multi-threading.

Upvotes: 0

Views: 1710

Answers (3)

Mike Nakis
Mike Nakis

Reputation: 62064

My new answer after gaining a better understanding of what you want to do:

Declare a "WorkItem" class which contains one of your bytearrays and its ordinal number, so that they can be sorted by ordinal number.

Make use of a java.util.PriorityQueue which is kept sorted by ordinal number. Essentially, all we care is that the first item in the priority queue at any given time will be the next item to process.

Each work thread stores its result in the PriorityQueue and issues a NotifyAll on some locking object.

The main thread waits on the locking object, and then if there are items in the queue, and if the ordinal of the (peeked, not dequeued) first item in the queue is equal to the number of items processed so far, then it dequeues the item and processes it. If not, it keeps waiting. If all of the items have been produced and processed, it is done.

Upvotes: 0

Mike Nakis
Mike Nakis

Reputation: 62064

Returning results in the right order is trivial. As each result arrives, store it in an arraylist, and once you have ALL the results, just sort the arraylist. You could use a PriorityQueue to keep the results sorted at all times as they arrive, but there is no point in doing this, since you will not be making any use of the results before all of them have arrived anyway.

So, what you could do is this:

Declare a "WorkItem" class which contains one of your bytearrays and its ordinal number, so that they can be sorted by ordinal number.

In your work threads, do something like this:

...do work and produce a work_item...

synchronized( LockObject )
{
    ResultList.Add( work_item );
    number_of_results++;
    LockObject.notifyAll();
}

In your main thread, do something like this:

synchronized( LockObject )
    while( number_of_results != number_of_items )
        LockObject.wait();
ResultList.Sort();
...go ahead and use the results...

Upvotes: 1

Bozho
Bozho

Reputation: 597324

Use the executors framework:

ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Future> futures = executorService.invokeAll(listOfCallables);
for (Future future : futures) {
   //do something with future.get();
}
executorService.shutdown();

The listOfCallables will be a List<Callable<ByteBuffer>> that you have constructed to operate on the data. For example:

list.add(new SubTaskCalculator(1, 20));
list.add(new SubTaskCalculator(21, 40));
list.add(new SubTaskCalculator(41, 60));

(arbitrary ranges of numbers, adjust that to your task at hand)

.get() blocks until the result is complete, but at the same time other tasks are also running, so when you reach them, their .get() will be ready.

Upvotes: 4

Related Questions