CodeShaman
CodeShaman

Reputation: 2189

How to interrupt Callable threads that are blocked inside a loop?

This is the first time I've ever created a multi-threaded application in Java that that will run continuously until canceled and I'm having trouble shutting down/interrupting my threads.

I have some threads that communicate with a Mediator which encapsulates a TransferQueue, an ExecutorService, and facilitates communication between producing and consuming threads.

I'm using this Mediator instead of Future because TransferQueue is a lot more block-friendly when it comes to a single consumer handling multiple producers (producer threads can mediator.put(E e) any time they want, and consumer threads can just wait on E e = mediator.take() for something to be available) and I do not want to waste CPU cycles polling.

The design is very clean, fast, and effective, but I'm having trouble interrupting blocking on queue.take(), serverSocket.accept(), and interrupting the threads as a whole.


The producers:

public class SocketProducer implements Colleague<Socket> {
    private Mediator<Socket> mediator;
    private ServerSocket serverSocket;
    private Integer listeningPort;

    private volatile boolean runnable = true;

    public SocketProducer(Mediator<Socket> mediator) {
        this.mediator = mediator;
    }

    public Colleague<Socket> setListeningPort(Integer listeningPort) {
        this.listeningPort = listeningPort;
        return this;
    }

    public Void call() throws Exception {
        serverSocket = new ServerSocket(listeningPort, 10);

        while (runnable) {
            Socket socket = serverSocket.accept(); // blocks until connection

            mediator.putIntoQueue(socket);
        }

        return null;
    }

    public void interrupt() {
        // ?
        runnable = false;
        serverSocket.close();
        // ?
    }
}

and the consumer:

private class SocketConsumer implements Colleague<Socket> {
    private Mediator<Socket> mediator;
    private volatile boolean runnable = true;

    public SomeConsumer(Mediator<Socket> mediator) {
        this.mediator = mediator;
    }

    public Void call() throws Exception {
        while (runnable) {
            Socket socket = mediator.takeFromQueue(); // blocks until element is in queue
        }

        return null;
    }

    public void interrupt() {
        // ?
        runnable = false;
        // ?
    }
}

The Colleague interface just extends Callable, to give some additional capability to the Mediator in managing its producer/consumer colleagues (ie: calling for:each colleague.interrupt()).

I've tried a lot of methods, throwing InterruptedException in various places, catching InterruptedException in various places, letting threads return an instance of their Thread to the mediator for interruption. Everything I've tried has been so ineffective that it feels like I'm missing some crucial piece to this puzzle.

So far the most effective method I've seen is the poison pill (which would be great if the queues didn't throw NPE on a null insertion), and all the methods I've tried of introducing a poison generic have failed because of ClassCastException (trying to cast Object to Socket, trying to instantiate a generic Socket, etc.).

I'm really not sure where to go from here. I'd really like to be able to cleanly terminate these threads on demand.


Completed solutions:

public class SocketProducer implements Colleague<Socket> {
    private static final Logger logger = LogManager.getLogger(SocketProducer.class.getName());
    private Mediator<Socket> mediator;
    private ServerSocket serverSocket;
    private Integer listeningPort;

    private volatile boolean runnable = true;

    public SocketProducer(Mediator<Socket> mediator) {
        this.mediator = mediator;
    }

    public Colleague<Socket> setListeningPort(Integer listeningPort) {
        this.listeningPort = listeningPort;
        return this;
    }

    public Void call() throws Exception {
        serverSocket = new ServerSocket(listeningPort, 10);
        logger.info("Listening on port " + listeningPort);

        while (runnable) {
            try {
                Socket socket = serverSocket.accept();
                logger.info("Connected on port " + socket.getLocalPort());
                mediator.putIntoQueue(socket);
            } catch (SocketException e) {
                logger.info("Stopped listening on port " + listeningPort);
            }
        }

        return null;
    }

    public void interrupt() {
        try {
            runnable = false;
            serverSocket.close();
        } catch (IOException e) {
            logger.error(e);
        }
    }

}

public class SocketConsumer implements Colleague<Socket> {
    private static final Logger logger = getLogger(SocketConsumer.class.getName());
    private Mediator<Socket> socketMediator;

    public SocketConsumer(Mediator<Socket> mediator) {
        this.socketMediator = mediator;
    }

    public Void call() throws Exception {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                Socket socket = socketMediator.takeFromQueue();
                logger.info("Received socket on port: " + socket.getLocalPort());
            } catch (InterruptedException e) {
                logger.info("Interrupted.");
                Thread.currentThread().interrupt();
            }
        }
        return null;
    }

    public void interrupt() {
        Thread.currentThread().interrupt();
    }

}

Upvotes: 4

Views: 1274

Answers (2)

Andrew Logvinov
Andrew Logvinov

Reputation: 21851

I think poison pills will only make things more complicated, so I'd keep it simple.

As for the ServerSocket, this answer suggests that calling close() should be enough to interrupt it.

As for BlockingQueue, consumer can look like this:

// you can use volatile flag instead if you like
while (!Thread.currentThread.isInterrupted()) {
    try {
        Object item = queue.take();
        // do something with item 
    } catch (InterruptedException e) {
        log.error("Consumer interrupted", e);
        Thread.currentThread().interrupt(); // restore flag
    }
}

Then in your Mediator you can just call interrupt() on a consumer thread.

Upvotes: 2

Will Hartung
Will Hartung

Reputation: 118804

A poison pill is straight forward.

private static Socket poisonPill = new Socket();

public Void call() throws Exception {
    while (runnable) {
        Socket socket = mediator.takeFromQueue(); // blocks until element is in queue
        if (socket == poisonPill) { 
            // quit the thread... 
        }
    }

    return null;
}

Note the socket == poisonPill. This is an equality check that they're the exact same instance, so that's how the poisonPill works yet still being type safe.

Upvotes: 1

Related Questions