Reputation: 507
Does Java support any queue object or mechanism to handle batch treatment?
ex: we have a queue(or any desired queue object), some producer push item into the queue one by one, my target is when we have 10 items or more than 10 items in this queue, we can trigger some handler to treat it in one batch.
or it is not triggered automatically, we need to find a way to loop the queue gracefully at the handler side.
do we have typical and high performance object or lib to handle this?
thanks, Emre
Upvotes: 6
Views: 3797
Reputation: 47193
Here's a quick attempt at processing objects in batches, using a background thread to collect and process objects pushed onto a queue by other threads:
public abstract class Batcher<E> implements Runnable {
public static interface BatchProcessor<E> {
public void processBatch(List<E> batch);
}
private final BlockingQueue<E> queue;
private final BatchProcessor<E> processor;
private Batcher(BlockingQueue<E> queue, BatchProcessor<E> processor) {
this.queue = queue;
this.processor = processor;
}
@Override
public void run() {
try {
while (true) {
List<E> batch = new ArrayList<E>();
for (int i = 0; i < 10; i++) {
batch.add(queue.take());
}
processor.processBatch(batch);
}
} catch (InterruptedException e) {
return;
}
}
}
To use this, you create a BlockingQueue
and put objects on it, create an instance of an implementation of BatchProcessor
to process the batches, then create an instance of Batcher
to pump objects from the former to the latter.
Upvotes: 1
Reputation: 533530
You can use BlockingQueue.drainTo()
to automatically obtain batches of tasks to be performed. This is suitable to over 100K task per second.
If you need higher performance queuing you can use the more complex Disruptor or Java Chronicle which can queue into the millions of tasks per second, both supporting auto-batching.
Upvotes: 1
Reputation: 41200
Batch processing in Queue could be achievable with wait/notify, something like you would block thread call against the resource upto it is available or not.
public class MyQueue implements Queue<Object>{
public synchronized List<Object> peek() {
if(this.list.size()>=10)
this.list.wait();
return Collections.subList(0,10);
}
@Override
public boolean add(Object e) {
this.list.add(e);
if(this.list.size()>=10)
this.list.notifyAll();
return false;
}
}
it is not triggered automatically
In that case you can call wait with specified time out.
Upvotes: 2
Reputation: 2447
I think CountDownLatch is what you need, or possibly CyclicBarrier. That would allow you to setup a synchronization point that will trigger consumers after a certain number of operations have occurred, and you can use a standard queue as the container object.
Upvotes: 0
Reputation: 206846
Have a look at the API documentation of interface java.util.Queue
, which has several implementations.
There's also a standard API, Java Message Service (JMS) to deal with queueing systems for exchanging messages between different processes.
Upvotes: 0