Reputation: 2655
I have a BlockingCollection which I write to from one thread and I read from another. The producer thread takes items received from a server and adds them to the BlockingCollection, while the reading thread attempts to empty the BlockingCollection and process them.
The problem I am trying to empty the queue in batches, because processing them one by one will be too slow. But when it's being constantly written to (thousands of items), then the consumer thread keeps reading them until it's emptied, which means that the processing will not even start until the writing is done.
Now, the processing in the consumer can be done in parallel, so I have been wondering how to go about that.
Currently I have 2 ideas:
After a certain number of items are read from the BlockingCollection in the consumer, start a new parallel job that processes them, instead of waiting to completely empty the queue and THEN start processing.
Use multiple consumers and hope that they will run in parallel instead of just constantly blocking each other while trying to read the BlockingCollection at the same time.
So my question is about option number 2 - is the BlockingCollection internally optimized for such a case? Will it partition the areas that are read from, or will the consumers fight over each item? If that's the case, then option 1 is superior?
Upvotes: 0
Views: 578
Reputation: 16049
To add just another alternative: (in no way production-ready!)
This makes use of TPL's Dataflow, where BatchBlock<T>
abstracts the batching away for us.
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
public class HoneyBatcher
{
private const int BATCHSIZE = 10; // Find the size that works best for you.
private readonly BatchBlock<Honey> batchBlock;
private readonly ExecutionDataflowBlockOptions _options =
new ExecutionDataflowBlockOptions()
{
// I'd start with 1, then benchmark if higher number actually benefits.
MaxDegreeOfParallelism = 1,
SingleProducerConstrained = true // if so, may micro-optimize throughput
};
// vv Whatever process you want done on a batch
public HoneyBatcher( Action<Honey[]> batchProcessor )
{
// BatchBlock does the batching
// and is the entrypoint to the pipline.
batchBlock = new BatchBlock<Honey>(BATCHSIZE);
// processorBlock processes each batch that batchBlock will produce
// Parallel executions as well as other tweaks can be configured through options.
ActionBlock<Honey[]> processorBlock =
new ActionBlock<Honey[]>(batchProcessor, _options);
// build the pipline
batchBlock.LinkTo(processorBlock);
// item => batchBlock => item[BATCHSIZE] => batchProcessor(item[])
}
// Add item individually and have them batched up
// and processed in a pipeline.
public Task<bool> ProcessAsync(Honey item)
{
return batchBlock.SendAsync(item);
// Can also be done with sync API.
}
}
public class Honey
{
// Just a dummy
}
Be advised that above snippet is just a rough layout of the idea. In production, you would of course address error handling, completion, etc.
Upvotes: 2
Reputation: 43384
A natural way to process the items in batches would be to insert them in the BlockingCollection
in batches, instead of trying to remove them later in batches. In other words you could use a BlockingCollection<T[]>
instead of BlockingCollection<T>
. The producer thread could do the batching easily by using a Queue<T>
:
var queue = new Queue<T>;
while (someCondition)
{
var item = ProduceItem();
queue.Enqueue(item);
if (queue.Count == batchSize)
{
blockingCollection.Add(queue.ToArray());
queue.Clear();
}
}
if (queue.Count > 0)
{
blockingCollection.Add(queue.ToArray());
queue.Clear();
}
blockingCollection.CompleteAdding();
Depending on the situation you could also use some LINQ-style operator like the Batch
from the MoreLinq library.
Finally, to answer your main question, yes, the BlockingCollection
class handles excellently multiple consumers as well as multiple producers. In case the collection is empty all consumers are blocked, and when an item arrives it is given to one of the waiting consumers.
Upvotes: 0