toolforger
toolforger

Reputation: 831

How to thread a sequence of actions through multiple threads?

I am exploring a problem which is likely a special case of a problem class, but I don't know the problem class nor the appropriate terminology, so I have to resort to desribing the problem using ad-hoc vocabulary. I'll rephrase once I know the right terminology.

I have a bunch of singletons A, B, C. The singletons are:

The system accepts tasks to be processed in parallel as far as possible.
Each task consists of a sequence of actions, each action to be executed using one of the singletons. Different tasks may access different singleton in different order, and tasks may contain loops of actions.

Pseudocode:

void myTask(in1, in2, ...) {
    doWithA(() -> {
       // use in1, in2, ...
       // inspect and/or update A
       // set up outputs to be used as inputs for the next action:
       outA1 = ...
       outA2 = ...
       ...
    });
    doWithB(() -> {
       // use outA1, outA2, ...
       // inspect and/or update B
       // set up outputs to be used as inputs for the next action:
       outB1 = ...
       outB2 = ...
       ...
    });
    // Tasks may touch singletons repeatedly, in any order
    doWithA(() -> {
       // outB1, outB2, ..., inspect/modify A, set up outputs
       outAx1 = ...
       outAx2 = ...
       ...
    });
    // Tasks may have loops:
    while (conditionInC(() -> ...) {
        doWithC(() -> ...);
        doWithD(() -> ...);
    }
    // I am aware that a loop like this can cause a livelock.
    // That's an aspect for another question, on another day.
}

There are multiple tasks like myTask above.
Tasks to be executed are wrapped in a closure and scheduled to a ThreadPoolExecutor (or something similar).

Approaches I considered:

  1. Have singletons LockA, LockB, ...
    Each doWithX is merely a synchronized(X) block.
    OutXn are local variables of myTask.
    Problem: One of the singletons is Swing, and I can't move the EDT into a thread that I manage.
  2. As above. Solve the Swing problem from approach (1) by coding doWithSwing(){...} as SwingUtilities.invokeAndWait(() -> {...}.
    Problem: invokeAndWait is generally considered prone to deadlock. How do I find out if I am into this kind of trouble with the pattern above?
  3. Have threads threadA, threadB, ..., each of them "owning" one of the singletons (Swing already has this, it is the EDT).
    doWithX schedules the block as a Runnable on threadX.
    outXn are set up as Future<...> outXn = new SettableFuture<>(), the assignments become outXn.set(...).
    Problem: I couldn't find anything like SettableFuture in the JDK; all ways to create a Futurethat I could find were somehow tied to a ThreadPool. Maybe I am looking at the wrong top-level interface and Future is a red herring?

With of these approaches would be best?
Is there a superior approach that I didn't consider?

Upvotes: 1

Views: 414

Answers (1)

TrogDor
TrogDor

Reputation: 1029

I don't know the problem class nor the appropriate terminology

I'd probably just refer to the problem class as concurrent task orchestration.

There's a lot of things to consider when identifying the right approach. If you provide some more details, I'll try to update my answer with more color.

There are no constraints like "you must access B before you can do X with C" or similar.

This is generally a good thing. A very common cause of deadlocks is different threads acquiring the same locks in differing orders. E.g., thread 1 locks A then B while thread 2 owns the lock B and is waiting to acquire A. Designing the solution such that this situation does not occur is very important.

I couldn't find anything like SettableFuture in the JDK

Take a look at java.util.concurrent.CompletableFuture<T> - this is probably what you want here. It exposes a blocking get() as well as a number of asynchronous completion callbacks such as thenAccept(Consumer<? super T>).

invokeAndWait is generally considered prone to deadlock

It depends. If your calling thread isn't holding any locks that are going to be necessary for the execution of the Runnable you're submitting, you're probably okay. That said, if you can base your orchestration on asynchronous callbacks, you can instead use SwingUtilities.invokeLater(Runnable) - this will submit the execution of your Runnable on the Swing event loop without blocking the calling thread.

I would probably avoid creating a thread per singleton. Each running thread contributes some overhead and it's better to decouple the number of threads from your business logic. This will allow you to tune the software to different physical machines based on the number of cores, for example.

It sounds like you need each runWithX(...) method to be atomic. In other words, once one thread has begun accessing X, another thread cannot do so until the first thread is finished with its task step. If this is the case, then creating a lock object per singleton and insuring serial (rather than parallel) access is the right way to go. You can achieve this by wrapping the execution of closures that get submitted in your runWithX(...) methods in a synchronized Java code block. The code within the block is also referred to as the critical section or monitor region.

Another thing to consider is thread contention and order of execution. If two tasks both require access to X and task 1 gets submitted before task 2, is it a requirement that task 1's access to X occurs before task 2's? A requirement like that can complicate the design quite a bit and I would probably recommend a different approach than outlined above.

Is there a superior approach that I didn't consider?

These days there are frameworks out there for solving these types of problems. I'm specifically thinking of reactive streams and RxJava. While it is a very powerful framework, it also comes with a very steep learning curve. A lot of analysis and consideration should be done before adopting such a technology within an organization.


Update:

Based on your feedback, I think a CompletableFuture-based approach probably makes the most sense.

I'd create a helper class to orchestrate task step execution:

class TaskHelper
{
    private final Object lockA;
    private final Object lockB;
    private final Object lockC;

    private final Executor poolExecutor;
    private final Executor swingExecutor;

    public TaskHelper()
    {
        poolExecutor = Executors.newFixedThreadPool( 2 );
        swingExecutor = SwingUtilities::invokeLater;

        lockA = new Object();
        lockB = new Object();
        lockC = new Object();
    }

    public <T> CompletableFuture<T> doWithA( Supplier<T> taskStep )
    {
        return doWith( lockA, poolExecutor, taskStep );
    }

    public <T> CompletableFuture<T> doWithB( Supplier<T> taskStep )
    {
        return doWith( lockB, poolExecutor, taskStep );
    }

    public <T> CompletableFuture<T> doWithC( Supplier<T> taskStep )
    {
        return doWith( lockC, swingExecutor, taskStep );
    }

    private <T> CompletableFuture<T> doWith( Object lock, Executor executor, Supplier<T> taskStep )
    {
        CompletableFuture<T> future = new CompletableFuture<>();

        Runnable serialTaskStep = () -> {

            T result;

            synchronized ( lock ) {
                result = taskStep.get();
            }

            future.complete( result );
        };

        executor.execute( serialTaskStep );
        return future;
    }
}

In my example above withA and withB get scheduled on a shared thread pool while withC is always executed on the Swing thread. The Swing Executor is already going to be serial in nature, so the lock is really optional there.

For creating actual tasks, I'd recommend creating an object for each task. This allows you to supply callbacks as method references, resulting in cleaner code and avoiding callback hell:

callback hell

This example computes the square of a provided number on a background thread pool and then displays the results on the Swing thread:

class SampleTask
{
    private final TaskHelper helper;
    private final String id;
    private final int startingValue;

    public SampleTask( TaskHelper helper, String id, int startingValue )
    {
        this.helper = helper;
        this.id = id;
        this.startingValue = startingValue;
    }

    private void start()
    {
        helper.doWithB( () -> {

            int square = startingValue * startingValue;
            return String.format( "computed-thread: %s computed-square: %d",
                    Thread.currentThread().getName(), square );
        } )
        .thenAccept( this::step2 );
    }

    private void step2( String result )
    {
        helper.doWithC( () -> {

            String message = String.format( "current-thread: %s task: %s result: %s",
                    Thread.currentThread().getName(), id, result );

            JOptionPane.showConfirmDialog( null, message );
            return null;
        } );
    }
}

@Test
public void testConcurrent() throws InterruptedException, ExecutionException
{
    TaskHelper helper = new TaskHelper();

    new SampleTask( helper, "task1", 5 ).start();
    new SampleTask( helper, "task2", 7 ).start();

    Thread.sleep( 60000 );
}

Update 2:

If you want to avoid callback hell while also avoiding the need to create an object per task, perhaps you should take a serious look at reactive streams after all.

Take a look at the "getting started" page for RxJava: https://github.com/ReactiveX/RxJava/wiki/How-To-Use-RxJava

For reference here's how the same example above would look in Rx (I'm removing the concept of task ID for simplicity):

@Test
public void testConcurrentRx() throws InterruptedException
{
    Scheduler swingScheduler = Schedulers.from( SwingUtilities::invokeLater );
    Subject<Integer> inputSubject = PublishSubject.create();

    inputSubject
        .flatMap( input -> Observable.just( input )
                .subscribeOn( Schedulers.computation() )
                .map( this::computeSquare ))
        .observeOn( swingScheduler )
        .subscribe( this::displayResult );

    inputSubject.onNext( 5 );
    inputSubject.onNext( 7 );
    Thread.sleep( 60000 );
}

private String computeSquare( int input )
{
    int square = input * input;
    return String.format( "computed-thread: %s computed-square: %d",
            Thread.currentThread().getName(), square );
}

private void displayResult( String result )
{
    String message = String.format( "current-thread: %s result: %s",
            Thread.currentThread().getName(), result );

    JOptionPane.showConfirmDialog( null, message );
}

Upvotes: 2

Related Questions