Anubis
Anubis

Reputation: 1222

Thread interrupt does not work

This is my first attempt at creating my own thread pool. The program works except when I try stopping all the threads. I'm trying to interrupt all the threads but for some reason it nevers reaches this segment:

if (this.currentThread().isInterrupted())

This is the output:

All tasks added
Task 5 recieved by thread 0
This is task #5
Task 4 recieved by thread 2
This is task #4
Task 3 recieved by thread 1
This is task #3
All done, threadpool closed
Task 5 now being executed by thread 0
Task 2 recieved by thread 0
This is task #2
Task 3 now being executed by thread 1
Task 1 recieved by thread 1
This is task #1
Task 4 now being executed by thread 2
No available tasks...
Task 1 now being executed by thread 1
No available tasks...
Task 2 now being executed by thread 0
No available tasks...

The program is supposed to stop generating output after the stopPool() method is called.

Can anyone tell me what is going wrong here?

public class CustomThread extends Thread
{
            private int id;
            private Task task;
            private ThreadPool threadpool;

            public CustomThread(int id, Task task, ThreadPool threadpool)
            {
                this.id = id;
                this.task = task;
                this.threadpool = threadpool;
            }

            public synchronized void run()
            {
                while(true)
                {
                    if (this.currentThread().isInterrupted())
                    {
                        System.out.println("Thread " + id + " is halted");
                    }
                    else
                    {
                        if (threadpool.getTaskQueue().isEmpty())
                        {
                            System.out.println("No available tasks...");
                            try 
                            {
                                this.wait();
                            } 
                            catch (InterruptedException e) 
                            {
                                e.printStackTrace();
                            }
                        }
                        else
                        {   
                            task = threadpool.retrieveTask();
                            System.out.println("Task " + task.getID() + " recieved by thread " + id);
                            task.run();
                            task.setState(true);
                            System.out.println("Task " + task.getID() + " now being executed by thread " + id);
                        }
                    }
                }
            }

            public synchronized void wakeUp()
            {
                this.notify();
            }

            public int getThreadID()
            {
                return id;
            }

        }

    import java.util.List;
    import java.util.ArrayList;
    import java.util.Stack;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;


    public class ThreadPool
    {
        private List<CustomThread> thread_list;
        private Stack<Task> task_queue;

        public ThreadPool(int threads)
        {
            thread_list = new ArrayList<CustomThread>();
            task_queue = new Stack<Task>();
            for(int i=0; i<threads; i++)
            {
                CustomThread thread = new CustomThread(i, null, this);
                thread_list.add(thread);
                thread.start();
            }
        }

        //Must use notify() to wake up an idle thread
        public synchronized void add(Task task)
        {
            task_queue.push(task);
            try
            {
                for(CustomThread t : thread_list)
                {
                    t.wakeUp();
                }

            }
            catch (Exception e)
            {
                e.printStackTrace();
            }

        }

        public synchronized void stopPool()
        {
            for(CustomThread t : thread_list)
            {
                t.currentThread().interrupt();
            }

        }

        public synchronized Stack getTaskQueue()
        {
            return task_queue;
        }

        public synchronized Task retrieveTask()
        {
            return task_queue.pop();
        }
    }

public class Task implements Runnable
{
    private int id;
    private boolean finished = false;
    public Task(int id)
    {
        this.id = id;
    }

    public synchronized void run()
    {
            System.out.println("This is task #" + id);

            try
            {
                Thread.sleep(1000);
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
    }

    public int getID()
    {
        return id;
    }

    public void setState(Boolean finished)
    {
        this.finished = finished;
    }

    public boolean getState()
    {
        return finished;
    }
}


public class Init 
{
    public static void main(String[] args)
    {
        ThreadPool threadpool = new ThreadPool(3);
        threadpool.add(new Task(1));
        threadpool.add(new Task(2));
        threadpool.add(new Task(3));
        threadpool.add(new Task(4));
        threadpool.add(new Task(5));
        System.out.println("All tasks added");
        {
            try
            {
                Thread.sleep(1000);
            }
            catch(Exception e)
            {
                e.printStackTrace();
            }
        }
        threadpool.stopPool();
        System.out.println("All done, threadpool closed");
    }

}

Upvotes: 1

Views: 2412

Answers (1)

Peter Lawrey
Peter Lawrey

Reputation: 533432

Your code

t.currentThread().interrupt();

will interrupt the current thread repeatedly, not the thread t try instead

t.interrupt();

BTW: I would expect the wait() to throw an exception which you log and continue as if it didn't happen.


For your interest, this is how you might write it using an ExecutorService.

ExecutorService service = Executors.newFixedThreadPool(3);
for (int i = 1; i <= 5; i++) {
    final int id = i;
    service.submit(new Callable<Void>() {
        @Override
        public Void call() throws Exception {
            System.out.println("This is task #" + id);
            Thread.sleep(1000);
            return null;
        }
    });
}
System.out.println("All tasks added");
service.shutdown();
service.awaitTermination(1, TimeUnit.HOURS);
System.out.println("All done, threadpool closed");

prints

This is task #1
This is task #3
This is task #2
All tasks added
This is task #4
This is task #5
All done, threadpool closed

Upvotes: 3

Related Questions