Reputation: 107
I have a timed task that will make an http call and then check the return result. If the result is a certain value, I will do some stuff, and then this timed task can be terminated and no longer needs to be scheduled. How can I cancel this schedule within my task? ? Does ScheduledExecutorService provide a parameter similar to the termination condition?
ScheduledExecutorService service = new ScheduledThreadPoolExecutor(5);
service.scheduleAtFixedRate(() -> {
// http request
// check
// result is xxx, no more schedule
// How can I stop this task schedule within my task? ?
}, 1, 60, TimeUnit.SECONDS);
Upvotes: 4
Views: 2680
Reputation: 497
The simplest thing that can possibly work is to throw an exception from the body of the scheduled function when you want to stop. I think it's ugly. See the javadoc here.
Another way is not to schedule periodically (i.e. not use scheduleAtFixedRate), but schedule just once and reschedule (or not) after the result of the call is checked.
You will have to keep track of how much time the call took if you want to approximate what scheduleAtFixedRate does.
This solution approximates scheduleAtFixedRate and degrades to scheduleAtFixedDelay when your call takes more than 60 seconds:
private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
public static void main(String[] args) {
scheduler.schedule(() -> {
performHttpCallAndScheduleAgainIfNeeded(scheduler);
}, 1, TimeUnit.SECONDS);
}
private static void performHttpCallAndScheduleAgainIfNeeded(ScheduledExecutorService scheduler) {
final long startTime = System.currentTimeMillis();
boolean callWasOk = performHttpCall();
if (!callWasOk) {
final long elapsed = System.currentTimeMillis() - startTime;
final long millisToWaitForNextAttempt = Math.max(0, 60000 - elapsed);
scheduler.schedule(() -> performHttpCallAndScheduleAgainIfNeeded(scheduler), millisToWaitForNextAttempt, TimeUnit.MILLISECONDS);
} else {
// the call was ok... you have nothing to do.
}
}
private static boolean performHttpCall() {
// do whatever you need to do here...
}
If you instead still want to use scheduleAtFixedRate or scheduleWithFixedDelay, you have to keep some state around (beware of concurrency) and note that both methods return a future that you have to use to cancel them.
The trick here is that you will have to pass the result of your scheduleAtFixRate call to the actual scheduled function. You can solve this chicked-egg problem by using a container instead of the value: here I used CompletableFuture as a container:
public static void main(String[] args) {
final CompletableFuture<ScheduledFuture<?>> cancellablePeriodicTask = new CompletableFuture<>();
final ScheduledFuture<?> cancellable = scheduler.scheduleAtFixedRate(() -> {
performHttpCallAndScheduleAgainIfNeeded(cancellablePeriodicTask);
}, 1, 60, TimeUnit.SECONDS);
cancellablePeriodicTask.complete(cancellable);
}
private static void performHttpCallAndScheduleAgainIfNeeded(CompletableFuture<ScheduledFuture<?>> cancellable) {
boolean callWasOk = performHttpCall();
if (callWasOk) {
// here you cancel
cancellable.whenComplete((scheduledFuture, throwable) -> {
if (throwable == null) {
scheduledFuture.cancel(true);
}
});
}
// call was not ok here, nothing to do since the scheduler will call this again.
}
private static boolean performHttpCall() {
// do whatever you need to do here...
}
In this approach CompletableFuture as mechanism for passing values between threads is not the only thing that works (SynchronousQueue also works here).
Upvotes: 1
Reputation: 338496
One approach is to use a pair of executor services. One is the scheduling service, running your task repeatedly until some condition is met. When met, the last execution of the task submits a cancellation task to the other executor service. That other second executor service then performs a shutdown of the scheduled executor service.
Make a non-scheduled executor service.
Make a scheduled executor service.
Instantiate your repeating task as a Runnable
or Callable
. Pass to its constructor a reference to both executor services.
Schedule your task on the scheduled executor service.
Every time the task runs, check for your quit condition.
run
/call
method complete.run
/call
method of that task cancels the passed scheduled executor service.To do the canceling, the task calls ScheduledExecutorService#shutdown
and #awaitTermination
.
package work.basil.tasking;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class App
{
public static void main ( String[] args )
{
App app = new App();
app.demo();
}
private void demo ( )
{
ScheduledExecutorService coreScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
ExecutorService cancellationExecutorService = Executors.newSingleThreadExecutor();
Duration expires = Duration.ofMinutes( 2 );
Runnable coreTask = new CoreTask( expires , coreScheduledExecutorService , cancellationExecutorService );
coreScheduledExecutorService.scheduleAtFixedRate( Objects.requireNonNull( coreTask ) , 0 , 20 , TimeUnit.SECONDS );
try { Thread.sleep( expires.plus( Duration.ofMinutes( 1 ) ).toMillis() ); } catch ( InterruptedException e ) { e.printStackTrace(); }
if ( Objects.nonNull( coreScheduledExecutorService ) )
{
if ( ! coreScheduledExecutorService.isShutdown() )
{
coreScheduledExecutorService.shutdown();
try { coreScheduledExecutorService.awaitTermination( 1 , TimeUnit.MINUTES ); } catch ( InterruptedException e ) { e.printStackTrace(); }
}
}
if ( Objects.nonNull( cancellationExecutorService ) )
{
if ( ! cancellationExecutorService.isShutdown() )
{
cancellationExecutorService.shutdown();
try { cancellationExecutorService.awaitTermination( 1 , TimeUnit.MINUTES ); } catch ( InterruptedException e ) { e.printStackTrace(); }
}
}
}
class CoreTask implements Runnable
{
private ScheduledExecutorService scheduledExecutorServiceRunningThisTask;
private ExecutorService cancellationExecutorService;
private Duration exiration;
Instant whenCreated;
public CoreTask ( final Duration expiration , final ScheduledExecutorService scheduledExecutorServiceRunningThisTask , final ExecutorService cancellationExecutorService )
{
this.exiration = Objects.requireNonNull( expiration );
this.scheduledExecutorServiceRunningThisTask = Objects.requireNonNull( scheduledExecutorServiceRunningThisTask );
this.cancellationExecutorService = Objects.requireNonNull( cancellationExecutorService );
this.whenCreated = Instant.now();
}
@Override
public void run ( )
{
Duration elapsed = Duration.between( this.whenCreated , Instant.now() );
System.out.print( "Core task running. " + Instant.now() + " | Elapsed: " + elapsed + " | " );
if ( elapsed.toSeconds() > this.exiration.toSeconds() )
{
System.out.println( "Core task is asking for cancellation. " + Instant.now() );
this.cancellationExecutorService.submit( ( ) -> this.scheduledExecutorServiceRunningThisTask.shutdown() );
} else
{
System.out.println( "Core task is completing another `run` execution. " + Instant.now() );
}
}
}
}
When run.
Core task running. 2021-12-05T04:20:41.659240Z | Elapsed: PT0.000857S | Core task is completing another `run` execution. 2021-12-05T04:20:41.672656Z
Core task running. 2021-12-05T04:21:01.663990Z | Elapsed: PT20.00593S | Core task is completing another `run` execution. 2021-12-05T04:21:01.664514Z
Core task running. 2021-12-05T04:21:21.659970Z | Elapsed: PT40.001914S | Core task is completing another `run` execution. 2021-12-05T04:21:21.660327Z
Core task running. 2021-12-05T04:21:41.663228Z | Elapsed: PT1M0.005188S | Core task is completing another `run` execution. 2021-12-05T04:21:41.663420Z
Core task running. 2021-12-05T04:22:01.662737Z | Elapsed: PT1M20.004684S | Core task is completing another `run` execution. 2021-12-05T04:22:01.663140Z
Core task running. 2021-12-05T04:22:21.663495Z | Elapsed: PT1M40.005431S | Core task is completing another `run` execution. 2021-12-05T04:22:21.664237Z
Core task running. 2021-12-05T04:22:41.663013Z | Elapsed: PT2M0.004967S | Core task is completing another `run` execution. 2021-12-05T04:22:41.663248Z
Core task running. 2021-12-05T04:23:01.662875Z | Elapsed: PT2M20.004835S | Core task is asking for cancellation. 2021-12-05T04:23:01.663117Z
By the way, keep in mind that console output from System.out
does not necessarily appear in chronological order. When you care about sequence, study the Instant.now()
values to verify order of execution.
Upvotes: 0