Reputation: 831
I am exploring a problem which is likely a special case of a problem class, but I don't know the problem class nor the appropriate terminology, so I have to resort to desribing the problem using ad-hoc vocabulary. I'll rephrase once I know the right terminology.
I have a bunch of singletons A
, B
, C
. The singletons are:
The system accepts tasks to be processed in parallel as far as possible.
Each task consists of a sequence of actions, each action to be executed using one of the singletons. Different tasks may access different singleton in different order, and tasks may contain loops of actions.
Pseudocode:
void myTask(in1, in2, ...) {
doWithA(() -> {
// use in1, in2, ...
// inspect and/or update A
// set up outputs to be used as inputs for the next action:
outA1 = ...
outA2 = ...
...
});
doWithB(() -> {
// use outA1, outA2, ...
// inspect and/or update B
// set up outputs to be used as inputs for the next action:
outB1 = ...
outB2 = ...
...
});
// Tasks may touch singletons repeatedly, in any order
doWithA(() -> {
// outB1, outB2, ..., inspect/modify A, set up outputs
outAx1 = ...
outAx2 = ...
...
});
// Tasks may have loops:
while (conditionInC(() -> ...) {
doWithC(() -> ...);
doWithD(() -> ...);
}
// I am aware that a loop like this can cause a livelock.
// That's an aspect for another question, on another day.
}
There are multiple tasks like myTask
above.
Tasks to be executed are wrapped in a closure and scheduled to a ThreadPoolExecutor
(or something similar).
Approaches I considered:
LockA
, LockB
, ...doWithX
is merely a synchronized(X)
block.OutXn
are local variables of myTask
.doWithSwing(){...}
as SwingUtilities.invokeAndWait(() -> {...}
.invokeAndWait
is generally considered prone to deadlock. How do I find out if I am into this kind of trouble with the pattern above?threadA
, threadB
, ..., each of them "owning" one of the singletons (Swing already has this, it is the EDT).doWithX
schedules the block as a Runnable
on threadX
.outXn
are set up as Future<...> outXn = new SettableFuture<>()
, the assignments become outXn.set(...)
.SettableFuture
in the JDK; all ways to create a Future
that I could find were somehow tied to a ThreadPool
. Maybe I am looking at the wrong top-level interface and Future
is a red herring?With of these approaches would be best?
Is there a superior approach that I didn't consider?
Upvotes: 1
Views: 414
Reputation: 1029
I don't know the problem class nor the appropriate terminology
I'd probably just refer to the problem class as concurrent task orchestration.
There's a lot of things to consider when identifying the right approach. If you provide some more details, I'll try to update my answer with more color.
There are no constraints like "you must access B before you can do X with C" or similar.
This is generally a good thing. A very common cause of deadlocks is different threads acquiring the same locks in differing orders. E.g., thread 1 locks A then B while thread 2 owns the lock B and is waiting to acquire A. Designing the solution such that this situation does not occur is very important.
I couldn't find anything like
SettableFuture
in the JDK
Take a look at java.util.concurrent.CompletableFuture<T>
- this is probably what you want here. It exposes a blocking get()
as well as a number of asynchronous completion callbacks such as thenAccept(Consumer<? super T>)
.
invokeAndWait
is generally considered prone to deadlock
It depends. If your calling thread isn't holding any locks that are going to be necessary for the execution of the Runnable
you're submitting, you're probably okay. That said, if you can base your orchestration on asynchronous callbacks, you can instead use SwingUtilities.invokeLater(Runnable)
- this will submit the execution of your Runnable
on the Swing event loop without blocking the calling thread.
I would probably avoid creating a thread per singleton. Each running thread contributes some overhead and it's better to decouple the number of threads from your business logic. This will allow you to tune the software to different physical machines based on the number of cores, for example.
It sounds like you need each runWithX(...)
method to be atomic. In other words, once one thread has begun accessing X, another thread cannot do so until the first thread is finished with its task step. If this is the case, then creating a lock object per singleton and insuring serial (rather than parallel) access is the right way to go. You can achieve this by wrapping the execution of closures that get submitted in your runWithX(...)
methods in a synchronized
Java code block. The code within the block is also referred to as the critical section or monitor region.
Another thing to consider is thread contention and order of execution. If two tasks both require access to X and task 1 gets submitted before task 2, is it a requirement that task 1's access to X occurs before task 2's? A requirement like that can complicate the design quite a bit and I would probably recommend a different approach than outlined above.
Is there a superior approach that I didn't consider?
These days there are frameworks out there for solving these types of problems. I'm specifically thinking of reactive streams and RxJava. While it is a very powerful framework, it also comes with a very steep learning curve. A lot of analysis and consideration should be done before adopting such a technology within an organization.
Update:
Based on your feedback, I think a CompletableFuture
-based approach probably makes the most sense.
I'd create a helper class to orchestrate task step execution:
class TaskHelper
{
private final Object lockA;
private final Object lockB;
private final Object lockC;
private final Executor poolExecutor;
private final Executor swingExecutor;
public TaskHelper()
{
poolExecutor = Executors.newFixedThreadPool( 2 );
swingExecutor = SwingUtilities::invokeLater;
lockA = new Object();
lockB = new Object();
lockC = new Object();
}
public <T> CompletableFuture<T> doWithA( Supplier<T> taskStep )
{
return doWith( lockA, poolExecutor, taskStep );
}
public <T> CompletableFuture<T> doWithB( Supplier<T> taskStep )
{
return doWith( lockB, poolExecutor, taskStep );
}
public <T> CompletableFuture<T> doWithC( Supplier<T> taskStep )
{
return doWith( lockC, swingExecutor, taskStep );
}
private <T> CompletableFuture<T> doWith( Object lock, Executor executor, Supplier<T> taskStep )
{
CompletableFuture<T> future = new CompletableFuture<>();
Runnable serialTaskStep = () -> {
T result;
synchronized ( lock ) {
result = taskStep.get();
}
future.complete( result );
};
executor.execute( serialTaskStep );
return future;
}
}
In my example above withA
and withB
get scheduled on a shared thread pool while withC
is always executed on the Swing thread. The Swing Executor
is already going to be serial in nature, so the lock is really optional there.
For creating actual tasks, I'd recommend creating an object for each task. This allows you to supply callbacks as method references, resulting in cleaner code and avoiding callback hell:
This example computes the square of a provided number on a background thread pool and then displays the results on the Swing thread:
class SampleTask
{
private final TaskHelper helper;
private final String id;
private final int startingValue;
public SampleTask( TaskHelper helper, String id, int startingValue )
{
this.helper = helper;
this.id = id;
this.startingValue = startingValue;
}
private void start()
{
helper.doWithB( () -> {
int square = startingValue * startingValue;
return String.format( "computed-thread: %s computed-square: %d",
Thread.currentThread().getName(), square );
} )
.thenAccept( this::step2 );
}
private void step2( String result )
{
helper.doWithC( () -> {
String message = String.format( "current-thread: %s task: %s result: %s",
Thread.currentThread().getName(), id, result );
JOptionPane.showConfirmDialog( null, message );
return null;
} );
}
}
@Test
public void testConcurrent() throws InterruptedException, ExecutionException
{
TaskHelper helper = new TaskHelper();
new SampleTask( helper, "task1", 5 ).start();
new SampleTask( helper, "task2", 7 ).start();
Thread.sleep( 60000 );
}
Update 2:
If you want to avoid callback hell while also avoiding the need to create an object per task, perhaps you should take a serious look at reactive streams after all.
Take a look at the "getting started" page for RxJava: https://github.com/ReactiveX/RxJava/wiki/How-To-Use-RxJava
For reference here's how the same example above would look in Rx (I'm removing the concept of task ID for simplicity):
@Test
public void testConcurrentRx() throws InterruptedException
{
Scheduler swingScheduler = Schedulers.from( SwingUtilities::invokeLater );
Subject<Integer> inputSubject = PublishSubject.create();
inputSubject
.flatMap( input -> Observable.just( input )
.subscribeOn( Schedulers.computation() )
.map( this::computeSquare ))
.observeOn( swingScheduler )
.subscribe( this::displayResult );
inputSubject.onNext( 5 );
inputSubject.onNext( 7 );
Thread.sleep( 60000 );
}
private String computeSquare( int input )
{
int square = input * input;
return String.format( "computed-thread: %s computed-square: %d",
Thread.currentThread().getName(), square );
}
private void displayResult( String result )
{
String message = String.format( "current-thread: %s result: %s",
Thread.currentThread().getName(), result );
JOptionPane.showConfirmDialog( null, message );
}
Upvotes: 2