Reputation: 403
The main goal is to run a method using ScheduledExecutorService and wait until all its tasks complete before resuming the main thread.
I've created a utility method in custom Scheduler class that accepts any Runnable:
public void scheduleFunction(Runnable function) {
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
final ScheduledFuture<?> producerHandle = scheduler.scheduleAtFixedRate(function, initialDelay, interval, MILLISECONDS);
scheduler.schedule(() -> { producerHandle.cancel(true); }, timeout, MILLISECONDS);
}
And use it like this in other class when I need execute its method in scheduled mode:
public void sendToKafka() {
Scheduler.scheduleFunction(this::produce);
}
This work fine, except one thing. When the main thread reaches sendToKafka() it calls Scheduler to schedule a function. Dut the main thread keeps running, at the same time the Scheduled function starts to work.
Actual result: two threads running at the same time
Expected result: when scheduler thread starts the main thread stops and waits until scheduler completes execution
How can I achieve this?
Upvotes: 1
Views: 1479
Reputation: 298103
Since you are creating and abandoning a ScheduledExecutorService
in this method, you should call shutdown()
to support timely release of the resources. If you do that, you can call awaitTermination
to wait for the completion of all pending jobs.
public void scheduleFunction(Runnable function) {
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
final ScheduledFuture<?> producerHandle
= scheduler.scheduleAtFixedRate(function, initialDelay, interval, MILLISECONDS);
scheduler.schedule(() -> {
producerHandle.cancel(true);
scheduler.shutdown();
}, timeout, MILLISECONDS);
try {
scheduler.awaitTermination(Long.MAX_VALUE, MILLISECONDS);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
Note that when you don’t need interruption, you can simply use
public void scheduleFunction(Runnable function) {
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(function, initialDelay, interval, MILLISECONDS);
scheduler.schedule(() -> scheduler.shutdown(), timeout, MILLISECONDS);
try {
scheduler.awaitTermination(Long.MAX_VALUE, MILLISECONDS);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
as shutting down the ScheduledExecutorService
implies stopping to reschedule the job; only if there’s an ongoing execution it will be completed and awaitTermination
will wait for it.
Upvotes: 3