Reputation: 521
The ultimate goal is to add extra behavior to ListenableFutures based on the type of the Callable
/Runnable
argument. I want to add extra behavior to each of the Future methods. (Example use cases can be found in AbstractExecutorService's javadoc and section 7.1.7 of Goetz's Java Concurrency in Practice)
I have an existing ExecutorService which overrides newTaskFor. It tests the argument's type and creates a subclass of FutureTask
. This naturally supports submit as well as invokeAny and invokeAll.
How do I get the same effect for the ListenableFutures returned by a ListeningExecutorService?
Put another way, where can I put this code
if (callable instanceof SomeClass) {
return new FutureTask<T>(callable) {
public boolean cancel(boolean mayInterruptIfRunning) {
System.out.println("Canceling Task");
return super.cancel(mayInterruptIfRunning);
}
};
} else {
return new FutureTask<T>(callable);
}
such that my client can execute the println
statement with
ListeningExecutorService executor = ...;
Collection<Callable> callables = ImmutableSet.of(new SomeClass());
List<Future<?>> futures = executor.invokeAll(callables);
for (Future<?> future : futures) {
future.cancel(true);
}
Here's a list of things I've already tried and why they don't work.
Pass MyExecutorService
to MoreExecutors.listeningDecorator.
Problem 1: Unfortunately the resulting ListeningExecutorService (an AbstractListeningExecutorService
) doesn't delegate to the ExecutorService methods, it delegates to the execute(Runnable) method on Executor. As a result, the newTaskFor
method on MyExecutorService
is never called.
Problem 2: AbstractListeningExecutorService
creates the Runnable (a ListenableFutureTask) via static factory method which I can't extend.
Inside newTaskFor
, create MyRunnableFuture
normally and then wrap it with a ListenableFutureTask
.
Problem 1: ListenableFutureTask's factory methods don't accept RunnableFutures, they accept Runnable
and Callable
. If I pass MyRunnableFuture
as a Runnable, the resulting ListenableFutureTask
just calls run()
and not any of the Future
methods (where my behavior is).
Problem 2: Even if it did call my Future
methods, MyRunnableFuture
is not a Callable
, so I have to supply a return value when I create the ListenableFutureTask
... which I don't have... hence the Callable
.
Let MyRunnableFuture extend ListenableFutureTask
instead of FutureTask
Problem: ListenableFutureTask
is now final (as of r10 / r11).
Let MyRunnableFuture
extend ForwardingListenableFuture and implement RunnableFuture. Then wrap the SomeClass
argument in a ListenableFutureTask
and return that from delegate()
Problem: It hangs. I don't understand the problem well enough to explain it, but this configuration causes a deadlock in FutureTask.Sync .
Source Code: As requested, here's the source for Solution D which hangs:
import java.util.*;
import java.util.concurrent.*;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.*;
/** See http://stackoverflow.com/q/8931215/290943 */
public final class MyListeningExecutorServiceD extends ThreadPoolExecutor implements ListeningExecutorService {
// ===== Test Harness =====
private static interface SomeInterface {
public String getName();
}
private static class SomeClass implements SomeInterface, Callable<Void>, Runnable {
private final String name;
private SomeClass(String name) {
this.name = name;
}
public Void call() throws Exception {
System.out.println("SomeClass.call");
return null;
}
public void run() {
System.out.println("SomeClass.run");
}
public String getName() {
return name;
}
}
private static class MyListener implements FutureCallback<Void> {
public void onSuccess(Void result) {
System.out.println("MyListener.onSuccess");
}
public void onFailure(Throwable t) {
System.out.println("MyListener.onFailure");
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println("Main.start");
SomeClass someClass = new SomeClass("Main.someClass");
ListeningExecutorService executor = new MyListeningExecutorServiceD();
Collection<Callable<Void>> callables = ImmutableSet.<Callable<Void>>of(someClass);
List<Future<Void>> futures = executor.invokeAll(callables);
for (Future<Void> future : futures) {
Futures.addCallback((ListenableFuture<Void>) future, new MyListener());
future.cancel(true);
}
System.out.println("Main.done");
}
// ===== Implementation =====
private static class MyRunnableFutureD<T> extends ForwardingListenableFuture<T> implements RunnableFuture<T> {
private final ListenableFuture<T> delegate;
private final SomeInterface someClass;
private MyRunnableFutureD(SomeInterface someClass, Runnable runnable, T value) {
assert someClass == runnable;
this.delegate = ListenableFutureTask.create(runnable, value);
this.someClass = someClass;
}
private MyRunnableFutureD(SomeClass someClass, Callable<T> callable) {
assert someClass == callable;
this.delegate = ListenableFutureTask.create(callable);
this.someClass = someClass;
}
@Override
protected ListenableFuture<T> delegate() {
return delegate;
}
public void run() {
System.out.println("MyRunnableFuture.run");
try {
delegate.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
System.out.println("MyRunnableFuture.cancel " + someClass.getName());
return super.cancel(mayInterruptIfRunning);
}
}
public MyListeningExecutorServiceD() {
// Same as Executors.newSingleThreadExecutor for now
super(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
if (runnable instanceof SomeClass) {
return new MyRunnableFutureD<T>((SomeClass) runnable, runnable, value);
} else {
return new FutureTask<T>(runnable, value);
}
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
if (callable instanceof SomeClass) {
return new MyRunnableFutureD<T>((SomeClass) callable, callable);
} else {
return new FutureTask<T>(callable);
}
}
/** Must override to supply co-variant return type */
@Override
public ListenableFuture<?> submit(Runnable task) {
return (ListenableFuture<?>) super.submit(task);
}
/** Must override to supply co-variant return type */
@Override
public <T> ListenableFuture<T> submit(Runnable task, T result) {
return (ListenableFuture<T>) super.submit(task, result);
}
/** Must override to supply co-variant return type */
@Override
public <T> ListenableFuture<T> submit(Callable<T> task) {
return (ListenableFuture<T>) super.submit(task);
}
}
Upvotes: 10
Views: 4929
Reputation: 3962
Good catch on the invokeAll
problems with my previous proposal.
Maybe I'm still overthinking this. Can MyRunnableFuture
itself implement ListenableFuture
and MyExecutorService
itself implement ListeningExecutorService
? You would still extend AbstractExecutorService
, but you'd declare that you're implementing ListeningExecutorService
, as well. Because all the methods of AbstractExecutorService
use the object returned by newTaskFor
, and because you know that you'll be returning a ListenableFuture
from that method, you can simply override all them to declare a return type of ListenableFuture
and implement them along the lines of return (ListenableFuture<T>) super.foo()
.
If that would work, perhaps we should make our AbstractListeningExecutorService
class public and add in some hooks that would make this easier?
Upvotes: 0
Reputation: 521
The javadoc for MoreExecutors.listeningDecorator clearly states that it delegates to execute
and never calls the delegate's submit
, invokeAll
, and invokeAny
. Furthermore it says that any "special handling of tasks must be implemented in the delegate's execute
method or by wrapping the returned ListeningExecutorService
."
So using newTaskFor
is out. Period.
I can get most of the behavior that I want by doing (source included below):
ForwardingListenableFuture
(MyListenableFuture)MyExecutorService
extend ForwardingListeningExecutorService
MoreExecutors.listeningDecorator
to wrap a standard ExecutorService
without the overridden newTaskFor
method.The only "gap" I have left is invokeAll
. There are two problems:
Future
s returned by listeningDecorator
are "in the same order" as the Callable
s that I submitted to it, then I can walk down the list of Callable
s and Future
s creating a MyListenableFuture
for each pair. This is probably a safe assumption, but still not entirely valid. (I'd also have to copy the Callable
s out of my argument to avoid an interleaved mutation, but that's ok)AbstractListeningExecutorService
invokes cancel
, isDone
, and get
within invokeAll
. This means that I can't add my behavior to those methods before they're called.import java.util.concurrent.*;
import com.google.common.util.concurrent.*;
/** See http://stackoverflow.com/q/8931215/290943 */
public final class MyListeningExecutorServiceE extends ForwardingListeningExecutorService {
// ===== Test Harness =====
private static interface SomeInterface {
public String getName();
}
private static class SomeClass implements SomeInterface, Callable<Void>, Runnable {
private final String name;
private SomeClass(String name) {
this.name = name;
}
public Void call() throws Exception {
System.out.println("SomeClass.call");
return null;
}
public void run() {
System.out.println("SomeClass.run");
}
public String getName() {
return name;
}
}
private static class MyListener implements FutureCallback<Void> {
public void onSuccess(Void result) {
System.out.println("MyListener.onSuccess");
}
public void onFailure(Throwable t) {
System.out.println("MyListener.onFailure");
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println("Main.start");
SomeInterface someClass = new SomeClass("Main.someClass");
ListeningExecutorService executor = new MyListeningExecutorServiceE();
ListenableFuture<Void> future = executor.submit((Callable<Void>) someClass);
Futures.addCallback(future, new MyListener());
future.cancel(true);
/* Not supported by this implementation
Collection<Callable<Void>> callables = ImmutableSet.<Callable<Void>>of(someClass);
List<Future<Void>> futures = executor.invokeAll(callables);
for (Future<Void> future : futures) {
Futures.addCallback((ListenableFuture<Void>) future, new MyListener());
future.cancel(true);
}
*/
executor.shutdown();
System.out.println("Main.done");
}
// ===== Implementation =====
private static class MyListenableFutureE<T> extends ForwardingListenableFuture<T> {
private final ListenableFuture<T> delegate;
private final SomeInterface someInterface;
private MyListenableFutureE(SomeInterface someInterface, ListenableFuture<T> delegate) {
this.delegate = delegate;
this.someInterface = someInterface;
}
@Override
protected ListenableFuture<T> delegate() {
return delegate;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
System.out.println("MyRunnableFuture.cancel " + someInterface.getName());
return super.cancel(mayInterruptIfRunning);
}
}
private final ListeningExecutorService delegate;
public MyListeningExecutorServiceE() {
delegate = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
}
@Override
protected ListeningExecutorService delegate() {
return delegate;
}
@Override
public <T> ListenableFuture<T> submit(Callable<T> task) {
ListenableFuture<T> future = super.submit(task);
if (task instanceof SomeInterface) {
future = new MyListenableFutureE<T>((SomeInterface) task, future);
}
return future;
}
@Override
public ListenableFuture<?> submit(Runnable task) {
ListenableFuture<?> future = super.submit(task);
if (task instanceof SomeInterface) {
future = new MyListenableFutureE((SomeInterface) task, future);
}
return future;
}
@Override
public <T> ListenableFuture<T> submit(Runnable task, T result) {
ListenableFuture<T> future = super.submit(task, result);
if (task instanceof SomeInterface) {
future = new MyListenableFutureE<T>((SomeInterface) task, future);
}
return future;
}
}
Upvotes: 1
Reputation: 3962
Based on this question and a couple others discussions I've had recently, I'm coming to the conclusion that RunnableFuture
/FutureTask
is inherently misleading: Clearly you submit a Runnable
, and clearly you get a Future
back, and clearly the underlying Thread
needs a Runnable
. But why should a class implement both Runnable
and Future
? And if it does, which Runnable
is it replacing? That's bad enough already, but then we introduce multiple levels of executors, and things really get out of hand.
If there's a solution here, I think it's going to require treating FutureTask
as an implementation detail of AbstractExecutorService
. I'd focus instead on splitting the problem into two pieces:
Future
.Runnable
/Future
distinction.)(grumble Markdown grumble)
class MyWrapperExecutor extends ForwardingListeningExecutorService {
private final ExecutorService delegateExecutor;
@Override public <T> ListenableFuture<T> submit(Callable<T> task) {
if (callable instanceof SomeClass) {
// Modify and submit Callable (or just submit the original Callable):
ListenableFuture<T> delegateFuture =
delegateExecutor.submit(new MyCallable(callable));
// Modify Future:
return new MyWrapperFuture<T>(delegateFuture);
} else {
return delegateExecutor.submit(callable);
}
}
// etc.
}
Could that work?
Upvotes: 3
Reputation: 198083
I suspect that MoreExecutors.listeningDecorator(service)
will modify the futures returned by your underlying service
. So if you've already modified your underlying ExecutorService
, and you're just decorating the Future
methods, you may just be able to use MoreExecutors.listeningDecorator(service)
directly. Have you experimented to see if that works? If not, can you provide more details on why that doesn't work?
----UPDATE----
It appears that your code, as written, mixes up submit and invokeAll. (Specifically, it calls invokeAll, which doesn't delegate to submit...)
That said, the conclusion I am rapidly drawing is that ListenableFutures and ListeningExecutorService aren't meant to be abused this way. Implementing this stuff without deadlocks is a Bona Fide Hard Problem, and you're not supposed to do it if you can avoid it.
I suspect that it might be worth filing a Guava issue that Solution A doesn't work, maybe. But I keep trying to figure out how to do what you're doing, and it just feels wrong to do it.
Can you give some insight as to why you're trying to return different futures?
Upvotes: 1
Reputation: 8582
According to ListeningExecutorService Javadoc, you can use MoreExecutors.listeningDecorator to decorate your own ExecutorService.
So use your ExecutorService that overrides newTaskFor, and wrap it with the method above. Would that work for you?
UPDATE
Ok, this is what I would do:
1) Download Guava sources if you haven't already.
2) Don't use listeningDecorator, instead, make your custom ExecutorService implement ListeningExecutorService.
3) Your subclass of FutureTask should implement ListenableFuture, and copy the code from ListenableFutureTask, which is quite simple, and then add your cancel method override.
4) Implement the methods of ListeningExecutorService on your custom ExecutorService by changing the returning method of the existing methods to ListenableFuture.
Upvotes: 1