Reputation: 1242
I am using a completion service and spawning child threads to perform some ETL. As I debug in my IDE and then stop all processes, I notice I still have a bunch of zombie threads killing my CPU. This is due to the fact that I'm not terminating the child threads properly.
Future<Boolean> future = completionService.submit(conversionProcessor);
boolean isCompleted = false;
while (!isCompleted && !closed.get()) {
try {
isCompleted = future.get(CONSUMER_HEARTBEAT_INTERVAL,
TimeUnit.SECONDS); // Wait until heartbeat interval exceeds
if (isCompleted) {
// do some things here
future.cancel(true);
break;
}
} catch (TimeoutException e) {
// Used to keep consumer alive in the cluster
consumer.poll(Duration.ofSeconds(CONSUMER_HEARTBEAT_INTERVAL)); // does heart-beat
} catch (CancellationException e) {
future.cancel(true);
break;
} catch (InterruptedException e) {
future.cancel(true);
break;
} catch (WakeupException we) {
future.cancel(true);
break;
} catch (Exception e) {
future.cancel(true);
break;
}
Essentially, I submit my Callable<Boolean>
to my completion service.
ExecutorCompletionService<Boolean> completionService = new ExecutorCompletionService<>(
Executors.newSingleThreadExecutor());
If I stop the debugger, this thread is presumably still running. I just added this future.cancel(true)
piece, which seems to have stopped continuously uploaded files from my child thread, but I still see these java processes running on my activity monitor.
I'm wondering how I should be thinking about this? I want the callable as it tells me when the underlying ETL has completed or not (true/false)
edit: future.cancel actually seems to be helping quite a bit.. Is this what I want to be using?
Upvotes: 0
Views: 253
Reputation: 8758
Once you are done with your CompletionService
you need to shutdown underlying executor so you need to do the following
ExecutorService es = Executors.newSingleThreadExecutor();
ExecutorCompletionService<Boolean> completionService = new ExecutorCompletionService<>(es);
And a the end call
es.shutdown();
es.awaitTermination(1, TimeUnit.SECONDS);
Upvotes: 2