zig
zig

Reputation: 4624

Parallel ForEach using a ConcurrentBag not working as expected

I have this code that processes items in a list:

    static readonly object _Lock = new object();

    public class Item
    {
        public string Name;
        public string ID;
    }

    static void Main(string[] args)
    {
        var items = new List<Item>
        {
            new Item { Name = "One", ID = "123" },
            new Item { Name = "Two", ID = "234" },
            new Item { Name = "Three", ID = "123" }
        };

        var itemsProcess = new ConcurrentBag<Item>();
        Parallel.ForEach(items, (item) =>
        {
            Item itemProcess = null;
            // lock (_Lock)
            {
                itemProcess = itemsProcess.FirstOrDefault(a => a.ID == item.ID);
            }
            if (itemProcess != null)
            {
                Console.WriteLine($"Item [{item.Name}] was already processed as [{itemProcess.Name}]");
            }
            else
            {
                itemsProcess.Add(item);
                Console.WriteLine($"Processing item [{item.Name}]");
                Thread.Sleep(1000); // do some work...
            }
        });

        Console.ReadKey();
      }

I am basically using the ConcurrentBag to check for the existence of an object based on several conditions.
I am expecting to always get an output like (order may vary):

Processing item [One]
Item [Three] was already processed as [One]
Processing item [Two]

But I sometimes get an output, which suggests my code is not thread safe:

Processing item [Three]
Processing item [One]
Processing item [Two]

So my assumption that itemsProcess.FirstOrDefault() will block was wrong.
Using the lock does not change anything. obviously, something is wrong here, and I really can't understand why?

I know I can "solve" this in other ways (one is to prepare the list before entering Parallel.ForEach()), but I really like to know why is this behavior?

Upvotes: 2

Views: 1961

Answers (3)

Tanveer Badar
Tanveer Badar

Reputation: 5523

Without resorting to a lock, you can "abuse" a ConcurrentDictionary and avoid all locking here to ensure uniqueness.

Add items to your dictionary by their ID, the data structure will stay consistent and once that's done you can work with the dictionary.Values field to get the unique items.

P.S.: I feel like your example is much more involved because no one does a Distinct() using Parallel.ForEach() which is what your code amounts to.

Finally, to address the reason why this is happening, when it comes to concurrency this is almost always an anti-pattern and doesn't do what the author meant here.

if(!collection.Contains(item))
      collection.Add(item);

By the time that Contains() has executed and returned false another thread could have executed the same, raced ahead and added the same item.

This race condition is why nearly all collection modifying operations come in two flavors: you either have a collection.TryAdd() which will try to atomically add an item and return true/false to tell you the outcome or you have things like GetOrAdd() and AddOrUpdate() which again atomically insert an item and get/update it afterwards.

Upvotes: 1

Johnathan Barclay
Johnathan Barclay

Reputation: 20363

You have 2 independent operations within your parallel loop: FirstOrDefault and Add.

ConcurrentBag cannot ensure thread-safety between these 2 operations.

An alternative would be ConcurrentDictionary, which has a GetOrAdd method that will only add an item when the key is not present:

var itemsProcess = new ConcurrentDictionary<string, Item>();
Parallel.ForEach(items, item =>
{
    // Returns existing item with same ID or adds this item
    var itemProcess = itemsProcess.GetOrAdd(item.Id, item);
    if (!object.ReferenceEquals(item, itemProcess))
    {
        Console.WriteLine($"Item [{item.Name}] was already processed as [{itemProcess.Name}]");
    }
    else
    {
        Console.WriteLine($"Processing item [{item.Name}]");
        // do some work...
    }
});

If you then need the processed Items as an ICollection, they can be accessed via itemsProcess.Values.

Upvotes: 5

TheGeneral
TheGeneral

Reputation: 81503

The reason why, is because there is still a data race... 2 threads can still read and add to the ConcurrentBag in a non-thread safe manner. Using any of the concurrent collections only means you have a structure which is self-consistent, but it doesn't protect you from writing other non-thread safe code

You had the right idea with a lock

var itemsProcess = new Dictionary<string, Item>();
Parallel.ForEach(items, (item) =>
{

   lock (_Lock)
   {
      if (itemsProcess.TryGetValue(item.ID, out var val))
      {
         Console.WriteLine($"Item [{item.Name}] was already processed as [{val.Name}]");
         return;
      }

      itemsProcess.TryAdd(item.ID, item);
   }

   Console.WriteLine($"Processing item [{item.Name}]");
   Thread.Sleep(1000); // do some work...

});

Note : You could also filter list for duplicates before you process it in parallel, this saves needing a lock or collection at all

Upvotes: 3

Related Questions