Drew
Drew

Reputation: 521

Add/extend behavior of Future created by ListeningExecutorService

The ultimate goal is to add extra behavior to ListenableFutures based on the type of the Callable/Runnable argument. I want to add extra behavior to each of the Future methods. (Example use cases can be found in AbstractExecutorService's javadoc and section 7.1.7 of Goetz's Java Concurrency in Practice)

I have an existing ExecutorService which overrides newTaskFor. It tests the argument's type and creates a subclass of FutureTask. This naturally supports submit as well as invokeAny and invokeAll.

How do I get the same effect for the ListenableFutures returned by a ListeningExecutorService?

Put another way, where can I put this code

if (callable instanceof SomeClass) {
   return new FutureTask<T>(callable) {
        public boolean cancel(boolean mayInterruptIfRunning) {
            System.out.println("Canceling Task");
            return super.cancel(mayInterruptIfRunning);
        }
    };
} else {
    return new FutureTask<T>(callable);
}

such that my client can execute the println statement with

ListeningExecutorService executor = ...;
Collection<Callable> callables = ImmutableSet.of(new SomeClass());
List<Future<?>> futures = executor.invokeAll(callables);
for (Future<?> future : futures) {
    future.cancel(true);
}

Failed Solutions

Here's a list of things I've already tried and why they don't work.

Solution A

Pass MyExecutorService to MoreExecutors.listeningDecorator.

Problem 1: Unfortunately the resulting ListeningExecutorService (an AbstractListeningExecutorService) doesn't delegate to the ExecutorService methods, it delegates to the execute(Runnable) method on Executor. As a result, the newTaskFor method on MyExecutorService is never called.

Problem 2: AbstractListeningExecutorService creates the Runnable (a ListenableFutureTask) via static factory method which I can't extend.

Solution B

Inside newTaskFor, create MyRunnableFuture normally and then wrap it with a ListenableFutureTask.

Problem 1: ListenableFutureTask's factory methods don't accept RunnableFutures, they accept Runnable and Callable. If I pass MyRunnableFuture as a Runnable, the resulting ListenableFutureTask just calls run() and not any of the Future methods (where my behavior is).

Problem 2: Even if it did call my Future methods, MyRunnableFuture is not a Callable, so I have to supply a return value when I create the ListenableFutureTask... which I don't have... hence the Callable.

Solution C

Let MyRunnableFuture extend ListenableFutureTask instead of FutureTask

Problem: ListenableFutureTask is now final (as of r10 / r11).

Solution D

Let MyRunnableFuture extend ForwardingListenableFuture and implement RunnableFuture. Then wrap the SomeClass argument in a ListenableFutureTask and return that from delegate()

Problem: It hangs. I don't understand the problem well enough to explain it, but this configuration causes a deadlock in FutureTask.Sync .

Source Code: As requested, here's the source for Solution D which hangs:

import java.util.*;
import java.util.concurrent.*;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.*;

/** See http://stackoverflow.com/q/8931215/290943 */
public final class MyListeningExecutorServiceD extends ThreadPoolExecutor implements ListeningExecutorService {

    // ===== Test Harness =====

    private static interface SomeInterface {
        public String getName();
    }
    
    private static class SomeClass implements SomeInterface, Callable<Void>, Runnable {
        private final String name;

        private SomeClass(String name) {
            this.name = name;
        }

        public Void call() throws Exception {
            System.out.println("SomeClass.call");
            return null;
        }

        public void run() {
            System.out.println("SomeClass.run");
        }

        public String getName() {
            return name;
        }
    }

    private static class MyListener implements FutureCallback<Void> {
        public void onSuccess(Void result) {
            System.out.println("MyListener.onSuccess");
        }

        public void onFailure(Throwable t) {
            System.out.println("MyListener.onFailure");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        System.out.println("Main.start");
        
        SomeClass someClass = new SomeClass("Main.someClass");
        
        ListeningExecutorService executor = new MyListeningExecutorServiceD();
        Collection<Callable<Void>> callables = ImmutableSet.<Callable<Void>>of(someClass);
        List<Future<Void>> futures = executor.invokeAll(callables);
        
        for (Future<Void> future : futures) {
            Futures.addCallback((ListenableFuture<Void>) future, new MyListener());
            future.cancel(true);
        }
        
        System.out.println("Main.done");
    }

    // ===== Implementation =====

    private static class MyRunnableFutureD<T> extends ForwardingListenableFuture<T> implements RunnableFuture<T> {

        private final ListenableFuture<T> delegate;
        private final SomeInterface someClass;

        private MyRunnableFutureD(SomeInterface someClass, Runnable runnable, T value) {
            assert someClass == runnable;
            this.delegate = ListenableFutureTask.create(runnable, value);
            this.someClass = someClass;
        }
        
        private MyRunnableFutureD(SomeClass someClass, Callable<T> callable) {
            assert someClass == callable;
            this.delegate = ListenableFutureTask.create(callable);
            this.someClass = someClass;
        }

        @Override
        protected ListenableFuture<T> delegate() {
            return delegate;
        }

        public void run() {
            System.out.println("MyRunnableFuture.run");
            try {
                delegate.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            System.out.println("MyRunnableFuture.cancel " + someClass.getName());
            return super.cancel(mayInterruptIfRunning);
        }
    }

    public MyListeningExecutorServiceD() {
        // Same as Executors.newSingleThreadExecutor for now
        super(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        if (runnable instanceof SomeClass) {
            return new MyRunnableFutureD<T>((SomeClass) runnable, runnable, value);
        } else {
            return new FutureTask<T>(runnable, value);
        }
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        if (callable instanceof SomeClass) {
            return new MyRunnableFutureD<T>((SomeClass) callable, callable);
        } else {
            return new FutureTask<T>(callable);
        }
    }

    /** Must override to supply co-variant return type */
    @Override
    public ListenableFuture<?> submit(Runnable task) {
        return (ListenableFuture<?>) super.submit(task);
    }

    /** Must override to supply co-variant return type */
    @Override
    public <T> ListenableFuture<T> submit(Runnable task, T result) {
        return (ListenableFuture<T>) super.submit(task, result);
    }

    /** Must override to supply co-variant return type */
    @Override
    public <T> ListenableFuture<T> submit(Callable<T> task) {
        return (ListenableFuture<T>) super.submit(task);
    }
}

Upvotes: 10

Views: 4929

Answers (5)

Chris Povirk
Chris Povirk

Reputation: 3962

Good catch on the invokeAll problems with my previous proposal.

Maybe I'm still overthinking this. Can MyRunnableFuture itself implement ListenableFuture and MyExecutorService itself implement ListeningExecutorService? You would still extend AbstractExecutorService, but you'd declare that you're implementing ListeningExecutorService, as well. Because all the methods of AbstractExecutorService use the object returned by newTaskFor, and because you know that you'll be returning a ListenableFuture from that method, you can simply override all them to declare a return type of ListenableFuture and implement them along the lines of return (ListenableFuture<T>) super.foo().

If that would work, perhaps we should make our AbstractListeningExecutorService class public and add in some hooks that would make this easier?

Upvotes: 0

Drew
Drew

Reputation: 521

The javadoc for MoreExecutors.listeningDecorator clearly states that it delegates to execute and never calls the delegate's submit, invokeAll, and invokeAny. Furthermore it says that any "special handling of tasks must be implemented in the delegate's execute method or by wrapping the returned ListeningExecutorService."

So using newTaskFor is out. Period.

I can get most of the behavior that I want by doing (source included below):

  1. Move the behavior I want to add to a subclass of ForwardingListenableFuture (MyListenableFuture)
  2. Let MyExecutorService extend ForwardingListeningExecutorService
  3. Use MoreExecutors.listeningDecorator to wrap a standard ExecutorService without the overridden newTaskFor method.
  4. Override submit to wrap the ListenableFuture returned by the listeningDecorator with MyListenableFuture

The only "gap" I have left is invokeAll. There are two problems:

  1. (minor) If I assume the Futures returned by listeningDecorator are "in the same order" as the Callables that I submitted to it, then I can walk down the list of Callables and Futures creating a MyListenableFuture for each pair. This is probably a safe assumption, but still not entirely valid. (I'd also have to copy the Callables out of my argument to avoid an interleaved mutation, but that's ok)
  2. (bigger) AbstractListeningExecutorService invokes cancel, isDone, and get within invokeAll. This means that I can't add my behavior to those methods before they're called.

Source

import java.util.concurrent.*;
import com.google.common.util.concurrent.*;

/** See http://stackoverflow.com/q/8931215/290943 */
public final class MyListeningExecutorServiceE extends ForwardingListeningExecutorService {

    // ===== Test Harness =====

    private static interface SomeInterface {
        public String getName();
    }

    private static class SomeClass implements SomeInterface, Callable<Void>, Runnable {
        private final String name;

        private SomeClass(String name) {
            this.name = name;
        }

        public Void call() throws Exception {
            System.out.println("SomeClass.call");
            return null;
        }

        public void run() {
            System.out.println("SomeClass.run");
        }

        public String getName() {
            return name;
        }
    }

    private static class MyListener implements FutureCallback<Void> {
        public void onSuccess(Void result) {
            System.out.println("MyListener.onSuccess");
        }

        public void onFailure(Throwable t) {
            System.out.println("MyListener.onFailure");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        System.out.println("Main.start");

        SomeInterface someClass = new SomeClass("Main.someClass");

        ListeningExecutorService executor = new MyListeningExecutorServiceE();

        ListenableFuture<Void> future = executor.submit((Callable<Void>) someClass);
        Futures.addCallback(future, new MyListener());
        future.cancel(true);

        /*  Not supported by this implementation

        Collection<Callable<Void>> callables = ImmutableSet.<Callable<Void>>of(someClass);
        List<Future<Void>> futures = executor.invokeAll(callables);

        for (Future<Void> future : futures) {
            Futures.addCallback((ListenableFuture<Void>) future, new MyListener());
            future.cancel(true);
        }
        */

        executor.shutdown();
        System.out.println("Main.done");
    }

    // ===== Implementation =====

    private static class MyListenableFutureE<T> extends ForwardingListenableFuture<T> {

        private final ListenableFuture<T> delegate;
        private final SomeInterface someInterface;

        private MyListenableFutureE(SomeInterface someInterface, ListenableFuture<T> delegate) {
            this.delegate = delegate;
            this.someInterface = someInterface;
        }

        @Override
        protected ListenableFuture<T> delegate() {
            return delegate;
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            System.out.println("MyRunnableFuture.cancel " + someInterface.getName());
            return super.cancel(mayInterruptIfRunning);
        }
    }

    private final ListeningExecutorService delegate;

    public MyListeningExecutorServiceE() {
        delegate = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
    }

    @Override
    protected ListeningExecutorService delegate() {
        return delegate;
    }

    @Override
    public <T> ListenableFuture<T> submit(Callable<T> task) {
        ListenableFuture<T> future = super.submit(task);

        if (task instanceof SomeInterface) {
            future = new MyListenableFutureE<T>((SomeInterface) task, future);
        }

        return future;
    }

    @Override
    public ListenableFuture<?> submit(Runnable task) {
        ListenableFuture<?> future = super.submit(task);

        if (task instanceof SomeInterface) {
            future = new MyListenableFutureE((SomeInterface) task, future);
        }

        return future;
    }

    @Override
    public <T> ListenableFuture<T> submit(Runnable task, T result) {
        ListenableFuture<T> future = super.submit(task, result);

        if (task instanceof SomeInterface) {
            future = new MyListenableFutureE<T>((SomeInterface) task, future);
        }

        return future;
    }
}

Upvotes: 1

Chris Povirk
Chris Povirk

Reputation: 3962

Based on this question and a couple others discussions I've had recently, I'm coming to the conclusion that RunnableFuture/FutureTask is inherently misleading: Clearly you submit a Runnable, and clearly you get a Future back, and clearly the underlying Thread needs a Runnable. But why should a class implement both Runnable and Future? And if it does, which Runnable is it replacing? That's bad enough already, but then we introduce multiple levels of executors, and things really get out of hand.

If there's a solution here, I think it's going to require treating FutureTask as an implementation detail of AbstractExecutorService. I'd focus instead on splitting the problem into two pieces:

  • I want to conditionally modify the returned Future.
  • I want to conditionally modify the code run by the executor service. (I'm actually not sure whether this is a requirement here, but I'll cover it in case it is. Even if not, it may help establish the Runnable/Future distinction.)

(grumble Markdown grumble)

class MyWrapperExecutor extends ForwardingListeningExecutorService {
  private final ExecutorService delegateExecutor;

  @Override public <T> ListenableFuture<T> submit(Callable<T> task) {
    if (callable instanceof SomeClass) {
      // Modify and submit Callable (or just submit the original Callable):
      ListenableFuture<T> delegateFuture =
          delegateExecutor.submit(new MyCallable(callable));
      // Modify Future:
      return new MyWrapperFuture<T>(delegateFuture);
    } else {
      return delegateExecutor.submit(callable);
    }
  }

  // etc.
}

Could that work?

Upvotes: 3

Louis Wasserman
Louis Wasserman

Reputation: 198083

I suspect that MoreExecutors.listeningDecorator(service) will modify the futures returned by your underlying service. So if you've already modified your underlying ExecutorService, and you're just decorating the Future methods, you may just be able to use MoreExecutors.listeningDecorator(service) directly. Have you experimented to see if that works? If not, can you provide more details on why that doesn't work?

----UPDATE----

It appears that your code, as written, mixes up submit and invokeAll. (Specifically, it calls invokeAll, which doesn't delegate to submit...)

That said, the conclusion I am rapidly drawing is that ListenableFutures and ListeningExecutorService aren't meant to be abused this way. Implementing this stuff without deadlocks is a Bona Fide Hard Problem, and you're not supposed to do it if you can avoid it.

I suspect that it might be worth filing a Guava issue that Solution A doesn't work, maybe. But I keep trying to figure out how to do what you're doing, and it just feels wrong to do it.

Can you give some insight as to why you're trying to return different futures?

Upvotes: 1

Luciano
Luciano

Reputation: 8582

According to ListeningExecutorService Javadoc, you can use MoreExecutors.listeningDecorator to decorate your own ExecutorService.

So use your ExecutorService that overrides newTaskFor, and wrap it with the method above. Would that work for you?

UPDATE

Ok, this is what I would do:

1) Download Guava sources if you haven't already.

2) Don't use listeningDecorator, instead, make your custom ExecutorService implement ListeningExecutorService.

3) Your subclass of FutureTask should implement ListenableFuture, and copy the code from ListenableFutureTask, which is quite simple, and then add your cancel method override.

4) Implement the methods of ListeningExecutorService on your custom ExecutorService by changing the returning method of the existing methods to ListenableFuture.

Upvotes: 1

Related Questions