peter.murray.rust
peter.murray.rust

Reputation: 38063

stopping execution of java thread and the role of join()

My program analyzes a large number of documents and occasionally gets a page that causes an infinite or very long loop. This is not analyzable in advance. I want to kill the particular page and continue with the next one (throwing away any results for the offending page). I have read SO answers such as this How to stop execution after a certain time in Java? and have written the following code:

// main program 
    private void runThread() throws InterruptedException {
        long timeout = 15000L;
        RunPageAnalyzer runPageAnalyzer = new RunPageAnalyzer(this);
        Thread t = new Thread(runPageAnalyzer); 
        long startTime = System.currentTimeMillis();
        t.start();
        while (t.isAlive()) {
            t.join(1000);
            long delta = System.currentTimeMillis() - startTime;
            LOG.debug("delta: "+delta);
            if (delta > timeout && t.isAlive()) {
                t.interrupt();
                t.join;
                break;
            }           
        }
    }

the method in the same class called by the thread

    void runActions() {
        // variable length calculation which should be abandoned if too long
    }

and the Runnable:

    class RunPageAnalyzer implements Runnable {
    private PageAnalyzerAction pageAnalyzerAction;
        public RunPageAnalyzer(PageAnalyzerAction pageAnalyzerAction) {
            this.pageAnalyzerAction = pageAnalyzerAction;
        }

        public void run() {
        try {
            pageAnalyzerAction.runActions();
        } catch (Exception e) {
            LOG.debug("Exception running thread ", e);
        }
    }

The output for a normal termination of runActions() seems OK:

    =========== page 1 =============
13863 [main] DEBUG org.xmlcml.graphics.control.page.PageAnalyzerAction  - pageActions: 24 on page 0
14863 [main] DEBUG org.xmlcml.graphics.control.page.PageAnalyzerAction  - delta: 1000
15864 [main] DEBUG org.xmlcml.graphics.control.page.PageAnalyzerAction  - delta: 2001
16864 [main] DEBUG org.xmlcml.graphics.control.page.PageAnalyzerAction  - delta: 3001
16975 [main] DEBUG org.xmlcml.graphics.control.page.PageAnalyzerAction  - delta: 3112
16975 [main] DEBUG org.xmlcml.graphics.control.page.PageAnalyzerAction  - finished page

but when the time limit is exceeded the process hangs in t.join().

    =========== page 2 =============
16975 [main] DEBUG org.xmlcml.graphics.control.page.PageAnalyzerAction  - pageActions: 24 on page 0
17976 [main] DEBUG org.xmlcml.graphics.control.page.PageAnalyzerAction  - delta: 1001
18976 [main] DEBUG org.xmlcml.graphics.control.page.PageAnalyzerAction  - delta: 2001
// ...
30976 [main] DEBUG org.xmlcml.graphics.control.page.PageAnalyzerAction  - delta: 14001
31976 [main] DEBUG org.xmlcml.graphics.control.page.PageAnalyzerAction  - delta: 15001

If I omit the t.join() then the process behaves as I would expect but I am worried that this might simply be building up huge numbers of threads that will be a problem later.

UPDATE: The answers so far have suggested that this is non-trivial (and I didn't find the standard Java examples/tutorials very helpful). The key point is that runActions() has to know explicitly that it might be interrupted. join() is not the primary problem because the threads just keep going.

FURTHER QUESTION: Do I have to insert Thread.currentThread().isInterrupted() in all the places in runActions() which I expect to be in unpredictably long loops?

Upvotes: 1

Views: 404

Answers (3)

Sanjay T. Sharma
Sanjay T. Sharma

Reputation: 23228

There is something else which is not mentioned by the answers here. If you want to cancel I/O done from a thread, you just can't "cancel" it and expect the actual I/O to be cancelled. You basically have to handle the interruption exception in your "task" and handle it accordingly, maybe even close the socket connection. I have a small snippet dedicated to "stopping" tasks run using threads which you might find helpful (apologies if it has typos, it was written a long time back).

public class ThreadStopTest {

    public static void main(String[] args) {
        testSqlThreadStop();
    }


    private static void testSocketReadStop() {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        SocketTask task = new SocketTask("http://www.yahoo.com", 80);
        Future<Integer> future = executor.submit(task);
        try {
            Integer result = future.get(1, TimeUnit.SECONDS);
            System.out.println("Computation complete; result: " + result);
        } catch(TimeoutException te) {
            future.cancel(true);
            task.cleanupAfterCancel();
            System.out.println("Computation cancelled");
        } catch(Exception e) {
            e.printStackTrace();
        }
        executor.shutdown();
    }

}


class SocketTask implements CleanableTask<Integer> {

    private final String host;

    private final int port;

    private Socket socket;

    public SocketTask(final String host, final int port) {
        this.host = host;
        this.port = port;
    }

    @Override
    public Integer call() throws Exception {
        InputStream in = null;
        // TODO: Actually update the count and cleanly handle exceptions
        int bytesRead = 0;
        try {
            this.socket = new Socket(this.host, this.port);
            in = this.socket.getInputStream();
            byte[] bytes = new byte[1000000];
            System.out.println("Started reading bytes");

            // The below behavior of waiting for a forceful close can be avoided
            // if we modify the FutureTask class (the default Future impl)
            // by passing in a CleanupHandler whose cleanup() method would be
            // invoked after invoking the `cancel` method or by making all 
            // your tasks implement a CancelledTask interface which has a 
            // `cleanupAfterCancel` method to do the same. :)
            try {
                in.read(bytes);
            } catch(SocketException se) {
                if(Thread.currentThread().isInterrupted()) {
                    System.out.println("All OK; this socket was forcefully closed");
                } else {
                    se.printStackTrace();   // something was seriously wrong
                }
            }
        } catch(Exception e) {
            e.printStackTrace();
        } finally {
            if(in != null)  in.close();
        }
        return Integer.valueOf(bytesRead);
    }

    @Override
    public void cleanupAfterCancel() {
        try {
            this.socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }        
    }

}

Upvotes: 2

assylias
assylias

Reputation: 328735

I assume here that pageAnalyzerAction.runActions(); can be interrupted (i.e. it handles interruptions by exiting fairly quickly).

If you are not comfortable with the low level thread API, you could use an executor and futures from the java.concurrent package to deal with thread management and time out policy for you:

  • the executor will handle the threads management with a thread pool, reusing them if necessary
  • the future returned on task submission can be queried with a timeout - if the task does not complete within the timeout, the future will throw a TimeOutException and you can then cancel your task

A contrived example would be:

//declare an executor  somewhere in your code, at a high level to recycle threads
ExecutorService executor = Executors.newFixedThreadPool(10); //number of threads: to be adjusted

private void runThread() throws InterruptedException {
    long timeout = 15000L;
    RunPageAnalyzer runPageAnalyzer = new RunPageAnalyzer(this);
    Future future = executor.submit(runPageAnalyzer);
    try {
        future.get(timeout, TimeUnit.MILLISECONDS);
    } catch (ExecutionException e) {
        //the runnable threw an exception: handle it
    } catch (TimeoutException e) {
        //the task could not complete before the timeout
        future.cancel(true); //interrrupt it
    }
}

Upvotes: 3

esaj
esaj

Reputation: 16035

It sounds like your runActions-method does not react to the interrupted-state of the thread being set. The latter join-call after calling interrupt has no timeout, and will wait indefinitely for the thread t to die. You should check for the interrupted state within your runActions-method and react by terminating the operation if the interrupted status is set (Thread.interrupted() returns true).

Upvotes: 2

Related Questions