Reputation: 119
I have setup a ThreadPoolExecutor and start the threads to consume data from the blocking queue. On startup (when I call startThread below), the blocking queue is empty. I've set the timeout for the threads to be very large so that they don't die. The blocking queue is created outside the scope of WorkerThreadPoolExecutor, and Runnable items are put on it.
public class WorkerThreadPoolExecutor extends ThreadPoolExecutor {
private final MyBlockingQueue<MyRunnable> blockingQueue;
private ScheduledExecutorService statsExecutor = null;
public WorkerThreadPoolExecutor(MyBlockingQueue myBlockingQueue) {
super(5, 10, 5, TimeUnit.MINUTES, myBlockingQueue);
this.blockingQueue = myBlockingQueue;
}
@Override
public void shutdown() {
logger.info("Shutting down the stats emitter!");
super.shutdown();
if (statsExecutor != null) {
statsExecutor.shutdown();
}
}
public void startThreads() {
logger.info("Starting the WorkerThreadPoolExecutor!!!");
this.prestartCoreThread();
emitStats();
}
public void numThds() {
System.err.println("\t\t active: " + this.getActiveCount());
System.err.println("\t\t completed taskCount: " + this.getCompletedTaskCount());
System.err.println("\t\t core: " + this.getCorePoolSize());
System.err.println("\t\t poolsize: " + this.getPoolSize());
System.err.println("\t\t taskCount: " + this.getTaskCount());
System.err.println("\t\t Q-Size: " + this.getQueue().size());
//System.err.println("X Size is: -------------> " + blockingQueue.currentSize());
System.err.println("X Size is: -------------> " + blockingQueue.getBlockingQueue().size());
System.err.println("X Size is: -------------> " + this.getQueue().size());
}
public void emitStats() {
this.statsExecutor = Executors.newScheduledThreadPool(1);
final Runnable emitStats = new Runnable() {
public void run() {
System.err.println("Stats id: " + blockingQueue.id);
//System.err.println("Size is: -------------> " + blockingQueue.currentSize());
System.err.println("Stats size is: -------------> " + blockingQueue.getBlockingQueue().size());
numThds();
}
};
statsExecutor.scheduleAtFixedRate(emitStats, 2, 2, TimeUnit.SECONDS);
}
}
The blocking queue is created outside the scope above and items put on it:
BlockingQueue<MyRunnable> blockingQueue = new LinkedBlockingQueue()
Items are added to the queue to be processed, but they are never dequeued. I added metrics for stats that produced the following result:
Stats size is: -------------> 2
active: 0
completed taskCount: 0
core: 5
poolsize: 0
taskCount: 2
Q-Size: 2
X Size is: -------------> 2
X Size is: -------------> 2
How can I force the items to be taken off of the blocking queue and executed?
The code for MyRunnalbe is:
public class MyRunnable implements Runnable {
private int x;
public MyRunnable(int x) {
this.x = x;
}
public void run() {
System.out.println("----> " + x);
}
}
I create an instance of it by calling:
MyRunnable mr = new MyRunnable(3);
and is enqueued by calling:
blockingQueue.add(mr);
Upvotes: 2
Views: 50
Reputation: 140613
Two things:
Upvotes: 1