Guerino Rodella
Guerino Rodella

Reputation: 370

ScheduledExecutorService Thread creating multiples threads

I'm facing a wrong behavior using ScheduledExecutorService due a restart mechanism implemented.

Problem - Short

Each restart attempt is creating a new Scheduled task and restarting the old ones.

Problem - Long

The process goal is to publish a message into RabbitMQ from time to time ( it's keep-alive ). When an exception occurs with RabbitMQ, it uses the ExceptionObserver to notify that an exception occurred. The ExceptionObserver implemented stops the service and restart it again. It attemp to restart 3 times, if it was restart succesfully, the count is reseted to zero. If it couldn't restart, the attempt count is incremented and if it reaches the attempt limit, it will shutdown the process.

Each time the service is restarted, it creates a new "KeepAliveService" and restarts the last service. So, each time an exception occours, a new service is created and the old service is restarted. If 1 exception occours after restart there are 2 process running. If 2 exception occours there are 3 process running, and so on.

The service class which handles the keep-alive service ( start/stop ScheduledExecutorService )

private KeepaliveExecutor keepaliveExecutor; // This is the runnable used inside the scheduledService
private ScheduledFuture futureTask; // The escheduled task
private ScheduledExecutorService scheduledService; // the scheduled service 
private ExceptionObserver exceptionObserver; // The Exception Handler, which will handle the exceptions

public void startService( final int keepaliveTime ) throws IllegalArgumentException, FInfraException {
    keepaliveExecutor = new KeepaliveExecutor( new RabbitMQService( settings ), settings );
    keepaliveExecutor.setExceptionObserver( exceptionObserver );
    scheduledService = Executors.newSingleThreadScheduledExecutor();
    futureTask = scheduledService.scheduleAtFixedRate( keepaliveExecutor, 0, keepaliveTime, TimeUnit.MINUTES );   
}

public void stopService() {
    futureTask.cancel(true);
    scheduledService.shutdown();
}

The KeepaliveExecutor class

class KeepaliveExecutor implements Runnable {

private FInfraExceptionObserver exceptionObserver;

@Override
public void run() {
    try {
        final String keepAlive = JsonMapper.toJsonString( keepaliveMessage );
        rabbitService.publishMessage( keepAlive );
        keepaliveMessage.setFirtsPackage( false );
    } catch( FInfraException ex ) {
        if( exceptionObserver != null ) {
            exceptionObserver.notifyExpcetion(ex);
        }
    }        
}

The ExceptionObserver implementation class

public class FInfraExceptionHandler implements FInfraExceptionObserver {

private final FInfraServiceHandler finfraHandler;

public FInfraExceptionHandler(FInfraServiceHandler finfraHandler) {
    this.finfraHandler = finfraHandler;
}

@Override
public void notifyExpcetion(Throwable ex) {
    Util.logger.log( Level.INFO, "F-Infra Exception occurred", ex);
    finfraHandler.stopService();
    Util.logger.log( Level.INFO, "Waiting 30s for restarting..." );
    Util.wait( 30, TimeUnit.SECONDS );
    finfraHandler.startService();
}

The FInfraServiceHandler class

public class FInfraServiceHandler {

private static final int ATTEMPT_LIMIT = 3;

private FInfraService finfraService;
private int keepaliveTime;
private int attempt;

public FInfraServiceHandler() {
    this.finfraService = new FInfraService();
    this.finfraService.setExceptionObserver(new FInfraExceptionHandler( this ));
    this.attempt = 0;
}

void startService(){
    if( attempt <= ATTEMPT_LIMIT ) {
        try {
            attempt++;
            Util.logger.log(Level.INFO, "Starting F-Infra Service. Attemp[{0} of {1}]", new String[]{String.valueOf(attempt), String.valueOf(ATTEMPT_LIMIT)});
            finfraService.startService( keepaliveTime );
        } catch( FInfraException | RuntimeException ex ){
            Util.logger.log(Level.INFO, "F-INFRA EXCEPTION", ex);
            startService();
        }
        Util.logger.log( Level.INFO, "F-Infra started!");
        attempt = 0;
        return;
    }
    Util.logger.log( Level.INFO, "Restart attemp limit reached." );
    Main.closeAll(new ShutdownException("It's not possible stablish a connection with F-Infra Service."));
}

public void stopService() {
    if( attempt > 0 ){
        Util.logger.log(Level.INFO, "Stpoping F-Infra...");
        finfraService.stopService();
    }
}

And here follows the log which tells me there are more than one service running

jul 16, 2017 2:58:03 PM domain.FInfraServiceHandler startService
INFO: Starting F-Infra Service. Attemp[1 of 3]
jul 16, 2017 2:58:03 PM domain.FInfraServiceHandler startService
INFO: F-Infra started!
jul 16, 2017 5:01:15 PM domain.FInfraExceptionHandler notifyExpcetion
INFO: F-Infra Exception occurred 
domain.FInfraException: java.net.UnknownHostException: rabbit.domain
        at domain.RabbitMQService.openConnection(RabbitMQService.java:48)
        at domain.RabbitMQService.publishMessage(RabbitMQService.java:66)
        at domain.KeepaliveExecutor.run(KeepaliveExecutor.java:38)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
Caused by: java.net.UnknownHostException: rabbit.domain
        at java.net.AbstractPlainSocketImpl.connect(Unknown Source)
        at java.net.PlainSocketImpl.connect(Unknown Source)
        at java.net.SocksSocketImpl.connect(Unknown Source)
        at java.net.Socket.connect(Unknown Source)
        at com.rabbitmq.client.impl.FrameHandlerFactory.create(FrameHandlerFactory.java:32)
        at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:34)
        at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:91)
        at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:670)
        at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:722)
        at domain.RabbitMQService.openConnection(RabbitMQService.java:45)
        ... 9 more

jul 16, 2017 5:01:15 PM domain.FInfraExceptionHandler notifyExpcetion
INFO: Waiting 30s for restarting...
jul 16, 2017 5:01:45 PM domain.FInfraServiceHandler startService
INFO: Starting F-Infra Service. Attemp[1 of 3]
jul 16, 2017 5:01:45 PM domain.FInfraServiceHandler startService
INFO: F-Infra started!
 jul 16, 2017 6:01:58 PM domain.FInfraExceptionHandler notifyExpcetion
INFO: F-Infra Exception occurred
domain.FInfraException: java.net.UnknownHostException: rabbit.domain
        at domain.RabbitMQService.openConnection(RabbitMQService.java:48)
        at domain.RabbitMQService.publishMessage(RabbitMQService.java:66)
        at domain.KeepaliveExecutor.run(KeepaliveExecutor.java:38)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
Caused by: java.net.UnknownHostException: rabbit.domain
        at java.net.AbstractPlainSocketImpl.connect(Unknown Source)
        at java.net.PlainSocketImpl.connect(Unknown Source)
        at java.net.SocksSocketImpl.connect(Unknown Source)
        at java.net.Socket.connect(Unknown Source)
        at com.rabbitmq.client.impl.FrameHandlerFactory.create(FrameHandlerFactory.java:32)
        at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:34)
        at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:91)
        at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:670)
        at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:722)
        at domain.RabbitMQService.openConnection(RabbitMQService.java:45)
        ... 9 more

jul 16, 2017 6:01:58 PM domain.FInfraExceptionHandler notifyExpcetion
INFO: Waiting 30s for restarting...
jul 16, 2017 6:02:03 PM domain.FInfraExceptionHandler notifyExpcetion
INFO: F-Infra Exception occurred
domain.FInfraException: java.net.UnknownHostException: rabbit.domain
        at domain.RabbitMQService.openConnection(RabbitMQService.java:48)
        at domain.RabbitMQService.publishMessage(RabbitMQService.java:66)
        at domain.KeepaliveExecutor.run(KeepaliveExecutor.java:38)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
Caused by: java.net.UnknownHostException: rabbit.domain
        at java.net.AbstractPlainSocketImpl.connect(Unknown Source)
        at java.net.PlainSocketImpl.connect(Unknown Source)
        at java.net.SocksSocketImpl.connect(Unknown Source)
        at java.net.Socket.connect(Unknown Source)
        at com.rabbitmq.client.impl.FrameHandlerFactory.create(FrameHandlerFactory.java:32)
        at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:34)
        at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:91)
        at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:670)
        at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:722)
        at domain.RabbitMQService.openConnection(RabbitMQService.java:45)
        ... 9 more

jul 16, 2017 6:02:03 PM domain.FInfraExceptionHandler notifyExpcetion
INFO: Waiting 30s for restarting...
jul 16, 2017 6:02:28 PM domain.FInfraServiceHandler startService
INFO: Starting F-Infra Service. Attemp[1 of 3]
jul 16, 2017 6:02:28 PM domain.FInfraServiceHandler startService
INFO: F-Infra started!
jul 16, 2017 6:02:33 PM domain.FInfraServiceHandler startService
INFO: Starting F-Infra Service. Attemp[1 of 3]
jul 16, 2017 6:02:33 PM domain.FInfraServiceHandler startService
INFO: F-Infra started!

I don't know what to do to close the old Thread or use the current one to restart. What I tried wast calling Thread.currentThread().interrupt(); on the ExceptionObserver class before calling start method. But that doesn't works.

I have no idea in what to do.

Upvotes: 0

Views: 463

Answers (1)

bowmore
bowmore

Reputation: 11308

In the FInfraServiceHandler class, your stopService method does not do anything if attempt is zero.

public void stopService() {
    if( attempt > 0 ){
        Util.logger.log(Level.INFO, "Stpoping F-Infra...");
        finfraService.stopService();
    }
}

So the original ScheduledExecutorService keeps going. When I removed the condition, the code behaved fine.

Note, by the way, that you call startService and stopService on the same instance, from different threads. I think you'll need some sort of synchronization on the mutable field attempt.

Upvotes: 2

Related Questions