Reza
Reza

Reputation: 360

How to stop timeout of a future

I'm computing a future for having a timeout in waiting for a serial event to happen:

Future<Response> future = executor.submit(new CommunicationTask(this, request));
response = new Response("timeout");
try {
  response = future.get(timeoutMilliseconds, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException e) {
  future.cancel(true);
  log.info("Execution time out." + e);
} catch (ExecutionException e) {
  future.cancel(true);
  log.error("Encountered problem communicating with device: " + e);
}

The CommunicationTask class has implemented the Observer interface to listen to an change from the serial port.

The problem is that reading from the serial port is relatively slow and even when a serial event is happening the time runs out and a TimeoutException is thrown. What can I do to stop the timeout clock of my future when a serial event is happening?

I tried it with an AtomicReference but that didn't change anything:

public class CommunicationTask implements Callable<Response>, Observer {
  private AtomicReference atomicResponse = new AtomicReference(new Response("timeout"));
  private CountDownLatch latch = new CountDownLatch(1);
  private SerialPort port;

  CommunicationTask(SerialCommunicator communicator, Request request) {
    this.communicator = communicator;
    this.message = request.serialize();
    this.port = communicator.getPort();
  }

  @Override
  public Response call() throws Exception {
    return query(message);
  }

  public Response query(String message) {
    communicator.getListener().addObserver(this);
    message = message + "\r\n";
    try {
      port.writeString(message);
    } catch (Exception e) {
      log.warn("Could not write to port: " + e);
      communicator.disconnect();
    }
    try {
      latch.await();
    } catch (InterruptedException e) {
      log.info("Execution time out.");
    }
    communicator.getListener().deleteObserver(this);
    return (Response)atomicResponse.get();
  }

  @Override
  public void update(Observable o, Object arg) {
    atomicResponse.set((Response)arg);
    latch.countDown();
  }
}

What can I do to solve this problem?

EDIT:

Ok I had one error. I was counting down my latch befor setting the atomicResponse in my update function. Now it seems to work, but there's still the question if this approach is the right way to do so?

Upvotes: 6

Views: 3314

Answers (2)

gowtham paruchuri
gowtham paruchuri

Reputation: 31

have you explored google's Guava 'future listener', it is based on Async future, hope following code snippet helps you....

import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

public class SyncFutureExample {
    public static void main(String[] args) {
        ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
        ListenableFuture<String> lf  = service.submit(new CommuncationTask());

        //no need for future.get() or future.get(10,time minutes)


        //add callbacks(= async future listeners) ....
        Futures.addCallback(lf, new FutureCallback<String>() {
              public void onSuccess(String input) {
                System.out.println(input + " >>> success");//gets a callback once task is success
              }
              public void onFailure(Throwable thrown) {
                  System.out.println(thrown + " >>> failure");//gets a callback if task is failed
              }
            });
        service.shutdown();
    }

}

class CommuncationTask implements Callable<String>{

    public String call() throws Exception {
        TimeUnit.SECONDS.sleep(15);// some dummy serious task .............
        return "TaskDone";
    }


}

Upvotes: 1

Evgeniy Dorofeev
Evgeniy Dorofeev

Reputation: 135992

Hope this will help. I won't comment on it in the hopes that everything is clear from the code.

class CommunicationTask implements Callable<String>, Observer {
    volatile boolean ignoreTimeoutException;

    public CommunicationTask(SerialCommunicator communicator, Request request) {
    }

    public String call() throws Exception {
        Thread.sleep(1000);
        return "done";
    }

    public void update(Observable o, Object arg) {
        ignoreTimeoutException = true;
    }
}

class FutureCommunicationTask extends FutureTask<String> {
    private CommunicationTask ct;

    public FutureCommunicationTask(CommunicationTask ct) {
        super(ct);
        this.ct = ct;
    }

    public String get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        try {
            return super.get(timeout, unit);
        } catch (TimeoutException e) {
            if (ct.ignoreTimeoutException) {
                return get();  //  no timeout wait 
            }
            throw e;
        }
    }
}

public class Test {

    public static void main(String[] args) throws Exception {
        CommunicationTask ct = new CommunicationTask(null, null);
        FutureTask<String> fct = new FutureCommunicationTask(ct);
        ExecutorService ex = Executors.newSingleThreadExecutor();
        ex.execute(fct);
//      uncomment this line and timeout will be cancelled 
        ct.update(null, null);  
        String res = fct.get(1, TimeUnit.MILLISECONDS);
        System.out.println(res);
    }
}

Upvotes: 0

Related Questions