Serhii Kachan
Serhii Kachan

Reputation: 403

Make other threads wait until ScheduledExecutorService completes all tasks

The main goal is to run a method using ScheduledExecutorService and wait until all its tasks complete before resuming the main thread.

I've created a utility method in custom Scheduler class that accepts any Runnable:

public void scheduleFunction(Runnable function) {
    final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    final ScheduledFuture<?> producerHandle = scheduler.scheduleAtFixedRate(function, initialDelay, interval, MILLISECONDS);
    scheduler.schedule(() -> { producerHandle.cancel(true); }, timeout, MILLISECONDS);
}

And use it like this in other class when I need execute its method in scheduled mode:

public void sendToKafka() {
  Scheduler.scheduleFunction(this::produce);
}

This work fine, except one thing. When the main thread reaches sendToKafka() it calls Scheduler to schedule a function. Dut the main thread keeps running, at the same time the Scheduled function starts to work.

Actual result: two threads running at the same time enter image description here

Expected result: when scheduler thread starts the main thread stops and waits until scheduler completes execution enter image description here

How can I achieve this?

Upvotes: 1

Views: 1479

Answers (1)

Holger
Holger

Reputation: 298103

Since you are creating and abandoning a ScheduledExecutorService in this method, you should call shutdown() to support timely release of the resources. If you do that, you can call awaitTermination to wait for the completion of all pending jobs.

public void scheduleFunction(Runnable function) {
    final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    final ScheduledFuture<?> producerHandle
        = scheduler.scheduleAtFixedRate(function, initialDelay, interval, MILLISECONDS);
    scheduler.schedule(() -> {
        producerHandle.cancel(true);
        scheduler.shutdown();
    }, timeout, MILLISECONDS);
    try {
        scheduler.awaitTermination(Long.MAX_VALUE, MILLISECONDS);
    } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
    }
}

Note that when you don’t need interruption, you can simply use

public void scheduleFunction(Runnable function) {
    final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    scheduler.scheduleAtFixedRate(function, initialDelay, interval, MILLISECONDS);
    scheduler.schedule(() -> scheduler.shutdown(), timeout, MILLISECONDS);
    try {
        scheduler.awaitTermination(Long.MAX_VALUE, MILLISECONDS);
    } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
    }
}

as shutting down the ScheduledExecutorService implies stopping to reschedule the job; only if there’s an ongoing execution it will be completed and awaitTermination will wait for it.

Upvotes: 3

Related Questions