Issues-Java
Issues-Java

Reputation: 119

ThreadPoolExecutor not consuming data

I have setup a ThreadPoolExecutor and start the threads to consume data from the blocking queue. On startup (when I call startThread below), the blocking queue is empty. I've set the timeout for the threads to be very large so that they don't die. The blocking queue is created outside the scope of WorkerThreadPoolExecutor, and Runnable items are put on it.

public class WorkerThreadPoolExecutor extends ThreadPoolExecutor {

    private final MyBlockingQueue<MyRunnable> blockingQueue;
    private ScheduledExecutorService statsExecutor = null;

    public WorkerThreadPoolExecutor(MyBlockingQueue myBlockingQueue) {

        super(5, 10, 5, TimeUnit.MINUTES, myBlockingQueue);
        this.blockingQueue = myBlockingQueue;

    }

    @Override
    public void shutdown() {
        logger.info("Shutting down the stats emitter!");
        super.shutdown();
        if (statsExecutor != null) {
            statsExecutor.shutdown();
        }
    }

    public void startThreads() {
        logger.info("Starting the WorkerThreadPoolExecutor!!!");
        this.prestartCoreThread();
        emitStats();
    }

    public void numThds() {
        System.err.println("\t\t active: " + this.getActiveCount());
        System.err.println("\t\t completed taskCount: " + this.getCompletedTaskCount());
        System.err.println("\t\t core: " + this.getCorePoolSize());
        System.err.println("\t\t poolsize: " + this.getPoolSize());
        System.err.println("\t\t taskCount: " + this.getTaskCount());
        System.err.println("\t\t Q-Size: " + this.getQueue().size());

        //System.err.println("X Size is: -------------> " + blockingQueue.currentSize());
        System.err.println("X Size is: -------------> " + blockingQueue.getBlockingQueue().size());
        System.err.println("X Size is: -------------> " + this.getQueue().size());
    }

    public void emitStats() {

        this.statsExecutor = Executors.newScheduledThreadPool(1);

        final Runnable emitStats = new Runnable() {
            public void run() {
                System.err.println("Stats id: " + blockingQueue.id);
                //System.err.println("Size is: -------------> " + blockingQueue.currentSize());
                System.err.println("Stats size is: -------------> " + blockingQueue.getBlockingQueue().size());

                numThds();
            }
        };
        statsExecutor.scheduleAtFixedRate(emitStats, 2, 2, TimeUnit.SECONDS);
    }

}

The blocking queue is created outside the scope above and items put on it:

BlockingQueue<MyRunnable> blockingQueue = new LinkedBlockingQueue()

Items are added to the queue to be processed, but they are never dequeued. I added metrics for stats that produced the following result:

Stats size is: -------------> 2
     active: 0
     completed taskCount: 0
     core: 5
     poolsize: 0
     taskCount: 2
     Q-Size: 2
X Size is: -------------> 2
X Size is: -------------> 2

How can I force the items to be taken off of the blocking queue and executed?

The code for MyRunnalbe is:

public class MyRunnable implements Runnable {
    private int x;
    public MyRunnable(int x) {
        this.x = x;
    }
    public void run() {
        System.out.println("----> " + x);
    }
}

I create an instance of it by calling:

 MyRunnable mr = new MyRunnable(3);

and is enqueued by calling:

blockingQueue.add(mr);

Upvotes: 2

Views: 50

Answers (1)

GhostCat
GhostCat

Reputation: 140613

Two things:

  • there is absolutely no need to create a new executor service during each call to emitStats(). To the contrary, that contradicts the whole idea of such a service.
  • there is no code in your example that would actually consume elements from tgat queue. Printing its size doesn't change the queue!

Upvotes: 1

Related Questions