Reputation: 43886
I have used many times the BlockingCollection
for implementing the producer/consumer pattern, but I have experienced bad performance with extremely granular data because of the associated overhead. This usually forces me to improvise by chunkifying/partitioning my data, in other words using a BlockingCollection<T[]>
instead of BlockingCollection<T>
. Here is a resent example. This works but it's ugly and error-prone. I end up using nested loops at both the producer and the consumer, and I must remember to Add
what is left at the end of a producer's workload. So I had the idea of implementing a chunky BlockingCollection
, that will handle all these complications internally, and will externalize the same simple interface with the existing BlockingCollection
. My problem is that I haven't managed yet to match the performance of the complex manual partitioning. My best attempt still pays a performance tax of around +100%, for extremely granular data (basically just integer values). So I would like to present here what I have done so far, hoping for an advice that will help me close the performance gap.
My best attempt is using a ThreadLocal<List<T>>
, so that each thread works on a dedicated chunk, removing any need for locks.
public class ChunkyBlockingCollection1<T>
{
private readonly BlockingCollection<T[]> _blockingCollection;
public readonly int _chunkSize;
private readonly ThreadLocal<List<T>> _chunk;
public ChunkyBlockingCollection1(int chunkSize)
{
_blockingCollection = new BlockingCollection<T[]>();
_chunkSize = chunkSize;
_chunk = new ThreadLocal<List<T>>(() => new List<T>(chunkSize), true);
}
public void Add(T item)
{
var chunk = _chunk.Value;
chunk.Add(item);
if (chunk.Count >= _chunkSize)
{
_blockingCollection.Add(chunk.ToArray());
chunk.Clear();
}
}
public void CompleteAdding()
{
var chunks = _chunk.Values.ToArray();
foreach (var chunk in chunks)
{
_blockingCollection.Add(chunk.ToArray());
chunk.Clear();
}
_blockingCollection.CompleteAdding();
}
public IEnumerable<T> GetConsumingEnumerable()
{
foreach (var chunk in _blockingCollection.GetConsumingEnumerable())
{
for (int i = 0; i < chunk.Length; i++)
{
yield return chunk[i];
}
}
}
}
My second best attempt is using a single List<T>
as chunk, that is accessed by all threads in a thread safe manner using a lock. Surprisingly this is only slightly slower than the ThreadLocal<List<T>>
solution.
public class ChunkyBlockingCollection2<T>
{
private readonly BlockingCollection<T[]> _blockingCollection;
public readonly int _chunkSize;
private readonly List<T> _chunk;
private readonly object _locker = new object();
public ChunkyBlockingCollection2(int chunkSize)
{
_blockingCollection = new BlockingCollection<T[]>();
_chunkSize = chunkSize;
_chunk = new List<T>(chunkSize);
}
public void Add(T item)
{
lock (_locker)
{
_chunk.Add(item);
if (_chunk.Count >= _chunkSize)
{
_blockingCollection.Add(_chunk.ToArray());
_chunk.Clear();
}
}
}
public void CompleteAdding()
{
lock (_locker)
{
_blockingCollection.Add(_chunk.ToArray());
_chunk.Clear();
}
_blockingCollection.CompleteAdding();
}
public IEnumerable<T> GetConsumingEnumerable()
{
foreach (var chunk in _blockingCollection.GetConsumingEnumerable())
{
for (int i = 0; i < chunk.Length; i++)
{
yield return chunk[i];
}
}
}
}
I have also tried to use as chunk a ConcurrentBag<T>
, which resulted in bad performance and an issue with correctness (because I didn't use a lock). Another attempt was replacing the lock (_locker)
with a SpinLock
, with even worse performance. The locking is clearly the root of my problems, because if I remove it completely then my class obtains optimal performance. Of course removing the lock fails miserably with more than one producers.
Upvotes: 0
Views: 478
Reputation: 5042
You can try with an array for _chunk
instead of using List<T>
. Then, you can use Interlocked.Increment to increment the next index to populate on Add
and when your count exceeds the size of your chunk, move it all to the blocking collection and reset the index in a lock, of course.
Upvotes: 1