woxihuanxiayua
woxihuanxiayua

Reputation: 107

ScheduledExecutorService cancel task inside my task

I have a timed task that will make an http call and then check the return result. If the result is a certain value, I will do some stuff, and then this timed task can be terminated and no longer needs to be scheduled. How can I cancel this schedule within my task? ? Does ScheduledExecutorService provide a parameter similar to the termination condition?

        ScheduledExecutorService service = new ScheduledThreadPoolExecutor(5);

        service.scheduleAtFixedRate(() -> {
            // http request
            // check
            // result is xxx, no more schedule
            // How can I stop this task schedule within my task? ?
            
        }, 1, 60, TimeUnit.SECONDS);

Upvotes: 4

Views: 2680

Answers (2)

white-surf-style-five
white-surf-style-five

Reputation: 497

The simplest thing that can possibly work is to throw an exception from the body of the scheduled function when you want to stop. I think it's ugly. See the javadoc here.

Another way is not to schedule periodically (i.e. not use scheduleAtFixedRate), but schedule just once and reschedule (or not) after the result of the call is checked.

You will have to keep track of how much time the call took if you want to approximate what scheduleAtFixedRate does.

This solution approximates scheduleAtFixedRate and degrades to scheduleAtFixedDelay when your call takes more than 60 seconds:

private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

public static void main(String[] args) {
    scheduler.schedule(() -> {
        performHttpCallAndScheduleAgainIfNeeded(scheduler);
    }, 1, TimeUnit.SECONDS);
}

private static void performHttpCallAndScheduleAgainIfNeeded(ScheduledExecutorService scheduler) {
    final long startTime = System.currentTimeMillis();
    boolean callWasOk = performHttpCall();
    if (!callWasOk) {
        final long elapsed = System.currentTimeMillis() - startTime;
        final long millisToWaitForNextAttempt = Math.max(0, 60000 - elapsed);
        scheduler.schedule(() -> performHttpCallAndScheduleAgainIfNeeded(scheduler), millisToWaitForNextAttempt, TimeUnit.MILLISECONDS);
    } else {
        // the call was ok... you have nothing to do.
    }
}

private static boolean performHttpCall() {
    // do whatever you need to do here...
}

If you instead still want to use scheduleAtFixedRate or scheduleWithFixedDelay, you have to keep some state around (beware of concurrency) and note that both methods return a future that you have to use to cancel them.

The trick here is that you will have to pass the result of your scheduleAtFixRate call to the actual scheduled function. You can solve this chicked-egg problem by using a container instead of the value: here I used CompletableFuture as a container:

public static void main(String[] args) {
    final CompletableFuture<ScheduledFuture<?>> cancellablePeriodicTask = new CompletableFuture<>();
    final ScheduledFuture<?> cancellable = scheduler.scheduleAtFixedRate(() -> {
        performHttpCallAndScheduleAgainIfNeeded(cancellablePeriodicTask);
    }, 1, 60, TimeUnit.SECONDS);
    cancellablePeriodicTask.complete(cancellable);
}

private static void performHttpCallAndScheduleAgainIfNeeded(CompletableFuture<ScheduledFuture<?>> cancellable) {
    boolean callWasOk = performHttpCall();
    if (callWasOk) {
        // here you cancel
        cancellable.whenComplete((scheduledFuture, throwable) -> {
          if (throwable == null) {  
              scheduledFuture.cancel(true);
          }
        });
    }

    // call was not ok here, nothing to do since the scheduler will call this again.
}

private static boolean performHttpCall() {
    // do whatever you need to do here...
}

In this approach CompletableFuture as mechanism for passing values between threads is not the only thing that works (SynchronousQueue also works here).

Upvotes: 1

Basil Bourque
Basil Bourque

Reputation: 338496

Pair of executor services

One approach is to use a pair of executor services. One is the scheduling service, running your task repeatedly until some condition is met. When met, the last execution of the task submits a cancellation task to the other executor service. That other second executor service then performs a shutdown of the scheduled executor service.

Steps

Make a non-scheduled executor service.

Make a scheduled executor service.

Instantiate your repeating task as a Runnable or Callable. Pass to its constructor a reference to both executor services.

Schedule your task on the scheduled executor service.

Every time the task runs, check for your quit condition.

  • When that condition is false, do nothing more. Let the run/call method complete.
  • When that condition is true, submit a new task onto the non-scheduled executor service. That new task took a reference to the scheduled executor service as an argument to its constructor. The run/call method of that task cancels the passed scheduled executor service.

To do the canceling, the task calls ScheduledExecutorService#shutdown and #awaitTermination.

Example code

package work.basil.tasking;

import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class App
{
    public static void main ( String[] args )
    {
        App app = new App();
        app.demo();
    }

    private void demo ( )
    {
        ScheduledExecutorService coreScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        ExecutorService cancellationExecutorService = Executors.newSingleThreadExecutor();
        Duration expires = Duration.ofMinutes( 2 );
        Runnable coreTask = new CoreTask( expires , coreScheduledExecutorService , cancellationExecutorService );
        coreScheduledExecutorService.scheduleAtFixedRate( Objects.requireNonNull( coreTask ) , 0 , 20 , TimeUnit.SECONDS );

        try { Thread.sleep( expires.plus( Duration.ofMinutes( 1 ) ).toMillis() ); } catch ( InterruptedException e ) { e.printStackTrace(); }
        if ( Objects.nonNull( coreScheduledExecutorService ) )
        {
            if ( ! coreScheduledExecutorService.isShutdown() )
            {
                coreScheduledExecutorService.shutdown();
                try { coreScheduledExecutorService.awaitTermination( 1 , TimeUnit.MINUTES ); } catch ( InterruptedException e ) { e.printStackTrace(); }
            }
        }
        if ( Objects.nonNull( cancellationExecutorService ) )
        {
            if ( ! cancellationExecutorService.isShutdown() )
            {
                cancellationExecutorService.shutdown();
                try { cancellationExecutorService.awaitTermination( 1 , TimeUnit.MINUTES ); } catch ( InterruptedException e ) { e.printStackTrace(); }
            }
        }
    }

    class CoreTask implements Runnable
    {
        private ScheduledExecutorService scheduledExecutorServiceRunningThisTask;
        private ExecutorService cancellationExecutorService;
        private Duration exiration;
        Instant whenCreated;

        public CoreTask ( final Duration expiration , final ScheduledExecutorService scheduledExecutorServiceRunningThisTask , final ExecutorService cancellationExecutorService )
        {
            this.exiration = Objects.requireNonNull( expiration );
            this.scheduledExecutorServiceRunningThisTask = Objects.requireNonNull( scheduledExecutorServiceRunningThisTask );
            this.cancellationExecutorService = Objects.requireNonNull( cancellationExecutorService );
            this.whenCreated = Instant.now();
        }

        @Override
        public void run ( )
        {
            Duration elapsed = Duration.between( this.whenCreated , Instant.now() );
            System.out.print( "Core task running. " + Instant.now() + " | Elapsed: " + elapsed + " | " );
            if ( elapsed.toSeconds() > this.exiration.toSeconds() )
            {
                System.out.println( "Core task is asking for cancellation. " + Instant.now() );
                this.cancellationExecutorService.submit( ( ) -> this.scheduledExecutorServiceRunningThisTask.shutdown() );
            } else
            {
                System.out.println( "Core task is completing another `run` execution. " + Instant.now() );
            }
        }
    }
}

When run.

Core task running. 2021-12-05T04:20:41.659240Z | Elapsed: PT0.000857S | Core task is completing another `run` execution. 2021-12-05T04:20:41.672656Z
Core task running. 2021-12-05T04:21:01.663990Z | Elapsed: PT20.00593S | Core task is completing another `run` execution. 2021-12-05T04:21:01.664514Z
Core task running. 2021-12-05T04:21:21.659970Z | Elapsed: PT40.001914S | Core task is completing another `run` execution. 2021-12-05T04:21:21.660327Z
Core task running. 2021-12-05T04:21:41.663228Z | Elapsed: PT1M0.005188S | Core task is completing another `run` execution. 2021-12-05T04:21:41.663420Z
Core task running. 2021-12-05T04:22:01.662737Z | Elapsed: PT1M20.004684S | Core task is completing another `run` execution. 2021-12-05T04:22:01.663140Z
Core task running. 2021-12-05T04:22:21.663495Z | Elapsed: PT1M40.005431S | Core task is completing another `run` execution. 2021-12-05T04:22:21.664237Z
Core task running. 2021-12-05T04:22:41.663013Z | Elapsed: PT2M0.004967S | Core task is completing another `run` execution. 2021-12-05T04:22:41.663248Z
Core task running. 2021-12-05T04:23:01.662875Z | Elapsed: PT2M20.004835S | Core task is asking for cancellation. 2021-12-05T04:23:01.663117Z

By the way, keep in mind that console output from System.out does not necessarily appear in chronological order. When you care about sequence, study the Instant.now() values to verify order of execution.

Upvotes: 0

Related Questions