circler
circler

Reputation: 369

Wait for next element in shared List in a C# concurrent producer / consumer pattern

I have a

List<object> listOfData;

in C# that gets data every X minute from a getDataBackgroundWorker.

I have another processDataBackgroundWorker that waits for the data from listOfData and processes it.

How can I make sure that I only get unique data from listOfData ( new data is added always ) and how can I ask the processDataBackgroundWorker to pause when there is no new data in listOfData?

Upvotes: 1

Views: 1481

Answers (1)

StuartLC
StuartLC

Reputation: 107277

List<> isn't a great choice in concurrency - there are out of the box alternatives, like ConcurrentBag, ConcurrentQueue which already have a lot of hard work done for you.

Here's an implementation of a producer-consumer pattern, using a BlockingCollection implementation as per MSDN,

  • The BlockingCollection is backed with a ConcurrentQueue, assuming that we are serially pull data in sequence on the consumer.
  • Methods which iterate over BlockingCollection block (with little overhead) until an item is available (i.e. your 'pause' is inherent - no need for a looped check with Thread.Sleeps on the consumer).
  • Termination is inherently built in, when the producer calls CompletedAdding
  • If you have more than one concurrent (competing) consumer, only one consumer will get an item, i.e. the duplication condition shouldn't be a concern (unless you mean the producer actually adds duplicates in the first place).
var queue = new BlockingCollection<string>(new ConcurrentQueue<string>());
var producer = Task.Run(() =>
{
    // Produce some random messages, with delays in between
    queue.Add("Hello!");
    Thread.Sleep(1000);
    queue.Add("World!");
    Thread.Sleep(2000);
    Enumerable.Range(0, 100)
        .ToList()
        .ForEach(x => queue.Add(x.ToString()));
    queue.CompleteAdding();
});

var consumer = Task.Run(() =>
    {
        while (!queue.IsCompleted)
        {
            try
            {
                Debug.WriteLine(queue.Take());
            }
            catch (InvalidOperationException)
            {
            }
        }
    });
Task.WaitAll(producer, consumer);

Upvotes: 4

Related Questions