Reputation: 4593
I'm trying to take all items in one fell swoop from a ConcurrentBag
. Since there's nothing like TryEmpty
on the collection, I've resorted to using Interlocked.Exchange
in the same fashion as described here: How to remove all Items from ConcurrentBag?
My code looks like this:
private ConcurrentBag<Foo> _allFoos; //Initialized in constructor.
public bool LotsOfThreadsAccessingThisMethod(Foo toInsert)
{
this._allFoos.Add(toInsert);
return true;
}
public void SingleThreadProcessingLoopAsALongRunningTask(object state)
{
var token = (CancellationToken) state;
var workingSet = new List<Foo>();
while (!token.IsCancellationRequested)
{
if (!workingSet.Any())
{
workingSet = Interlocked.Exchange(ref this._allFoos, new ConcurrentBag<Foo>).ToList();
}
var processingCount = (int)Math.Min(workingSet.Count, TRANSACTION_LIMIT);
if (processingCount > 0)
{
using (var ctx = new MyEntityFrameworkContext())
{
ctx.BulkInsert(workingSet.Take(processingCount));
}
workingSet.RemoveRange(0, processingCount);
}
}
}
The problem is that this sometimes misses items that are added to the list. I've written a test application that feeds data to my ConcurrentBag.Add
method and verified that it is sending all of the data. When I set a breakpoint on the Add
call and check the count of the ConcurrentBag
after, it's zero. The item just isn't being added.
I'm fairly positive that it's because the Interlocked.Exchange
call doesn't use the internal locking mechanism of the ConcurrentBag
so it's losing data somewhere in the swap, but I have no knowledge of what's actually happening.
How can I just grab all the items out of the ConcurrentBag
at one time without resorting to my own locking mechanism? And why does Add
ignore the item?
Upvotes: 2
Views: 1867
Reputation: 205629
I think taking all the items from the ConcurentBag
is not needed. You can achieve exactly the same behavior you are trying to implement simply by changing the processing logic as follows (no need for own synchronization or interlocked swaps):
public void SingleThreadProcessingLoopAsALongRunningTask(object state)
{
var token = (CancellationToken)state;
var buffer = new List<Foo>(TRANSACTION_LIMIT);
while (!token.IsCancellationRequested)
{
Foo item;
if (!this._allFoos.TryTake(out item))
{
if (buffer.Count == 0) continue;
}
else
{
buffer.Add(item);
if (buffer.Count < TRANSACTION_LIMIT) continue;
}
using (var ctx = new MyEntityFrameworkContext())
{
ctx.BulkInsert(buffer);
}
buffer.Clear();
}
}
Upvotes: 2