Brandon
Brandon

Reputation: 4593

Take all items from ConcurrentBag using a swap

I'm trying to take all items in one fell swoop from a ConcurrentBag. Since there's nothing like TryEmpty on the collection, I've resorted to using Interlocked.Exchange in the same fashion as described here: How to remove all Items from ConcurrentBag?

My code looks like this:

private ConcurrentBag<Foo> _allFoos; //Initialized in constructor.

public bool LotsOfThreadsAccessingThisMethod(Foo toInsert)
{
    this._allFoos.Add(toInsert);
    return true;
}

public void SingleThreadProcessingLoopAsALongRunningTask(object state)
{
    var token = (CancellationToken) state;
    var workingSet = new List<Foo>();

    while (!token.IsCancellationRequested)
    {
        if (!workingSet.Any())
        {
            workingSet = Interlocked.Exchange(ref this._allFoos, new ConcurrentBag<Foo>).ToList();
        }

        var processingCount = (int)Math.Min(workingSet.Count, TRANSACTION_LIMIT);

        if (processingCount > 0)
        {
            using (var ctx = new MyEntityFrameworkContext())
            {
                ctx.BulkInsert(workingSet.Take(processingCount));
            }
            workingSet.RemoveRange(0, processingCount);
        }
    }
}

The problem is that this sometimes misses items that are added to the list. I've written a test application that feeds data to my ConcurrentBag.Add method and verified that it is sending all of the data. When I set a breakpoint on the Add call and check the count of the ConcurrentBag after, it's zero. The item just isn't being added.

I'm fairly positive that it's because the Interlocked.Exchange call doesn't use the internal locking mechanism of the ConcurrentBag so it's losing data somewhere in the swap, but I have no knowledge of what's actually happening.

How can I just grab all the items out of the ConcurrentBag at one time without resorting to my own locking mechanism? And why does Add ignore the item?

Upvotes: 2

Views: 1867

Answers (1)

Ivan Stoev
Ivan Stoev

Reputation: 205629

I think taking all the items from the ConcurentBag is not needed. You can achieve exactly the same behavior you are trying to implement simply by changing the processing logic as follows (no need for own synchronization or interlocked swaps):

public void SingleThreadProcessingLoopAsALongRunningTask(object state)
{
    var token = (CancellationToken)state;
    var buffer = new List<Foo>(TRANSACTION_LIMIT);
    while (!token.IsCancellationRequested)
    {
        Foo item;
        if (!this._allFoos.TryTake(out item))
        {
            if (buffer.Count == 0) continue;
        }
        else
        {
            buffer.Add(item);
            if (buffer.Count < TRANSACTION_LIMIT) continue;
        }
        using (var ctx = new MyEntityFrameworkContext())
        {
            ctx.BulkInsert(buffer);
        }
        buffer.Clear();
    }
}

Upvotes: 2

Related Questions