Toto
Toto

Reputation: 7719

Delayed Producer Consumer pattern

I have a producer that produces integers by burst (1 to 50 in a few seconds). I have a consumer that consumes those integers by block.

I want the consumer to start consuming when the producer has finished his burst (I don't have the lead on the producer, I would just know that it has finished producing when there is nothing produced for 5 seconds).

I thought about thoses 2 differents way :

First : using kind of one consumer notfying the other :

private readonly List<int> _ids = new List<int>();
private readonly ManualResetEvent _mainWaiter = new ManualResetEvent(false);
private readonly ManualResetEvent _secondaryWaiter = new ManualResetEvent(false);

//This methods consumes the id from the producer
public void OnConsumeId(int newId)
{
    lock(_ids)
    {
        _ids.Add(newId);
        _mainWaiter.Set();
        _secondaryWaiter.Set();
    }
}

//This methods runs on the dedicated thread :
public void ConsumerIdByBlock()
{
    while(true)
    {
        _mainWaiter.Wait();
        while(_secondaryWaiter.Wait(5000));

        List<int> localIds;
        lock(_ids)
        {
            localIds = new List<int>(_ids);
            _ids.Clear();
        }
        //Do the job with localIds
    }
}

Second : have a kind of token for the last update

//This methods consumes the id from the producer
private int _lastToken;
public void OnConsumeId(int newId)
{
    lock(_ids)
    {
        _ids.Add(newId);
        ThreadPool.Queue(()=>ConsumerIdByBlock(++_lastToken));
    }
}

//This methods runs on the dedicated thread :
public void ConsumerIdByBlock(int myToken)
{       
    Thread.Sleep(5000);

    List<int> localIds;
    lock(_ids)
    {
        if(myToken !=_lastToken)
            return;     

        localIds = new List<int>(_ids);
        _ids.Clear();
    }

    //Do the job with localIds  
}

But I find these approaches a bit too complicated for doing this. Does a native/simpler solution exists ? How would you do ?

Upvotes: 5

Views: 544

Answers (4)

LVBen
LVBen

Reputation: 2061

I would use a System.Timers.Timer. Set a 5000ms interval, and every time a new id is produced, restart the Timer:

   class Consumer
   {
      List<int> _ids = new List<int>();
      Timer producer_timer = new Timer();

      public Consumer()
      {
         producer_timer.Elapsed += ProducerStopped;
         producer_timer.AutoReset = false;
      }

      public void OnConsumeId(int newId)
      {
         lock (_ids)
         {
            _ids.Add(newId);
            producer_timer.Interval = 5000;
            producer_timer.Start();
         }
      }

      public void ProducerStopped(object o, ElapsedEventArgs e)
      {
         // Do job here.
      }
   }

Upvotes: 1

Peter Ritchie
Peter Ritchie

Reputation: 35870

A queue seems the best way of handling what you've described.

thread-safe collections are, unfortunately, a bit of a misnomer. There are "blocking" collections in .NET, but the basic principle of newer classes is to not try to make instance-based classes "thread safe" (static classes is a different story. Class-level "thread safety" is a do-everything-for-everyone proposition. And the second side of that axiomatic coin is "nothing to no one". It can't possibly optimize for a particular application's needs or usages so it ends up taking the worst-case scenario and can't possibly account for all scenarios for all applications to they sometimes miss things. The miss things need to be eventually covered by another means and the interaction of those two means needs to be managed independently.

Queues are a basic reader/writer pattern that can be encapsulated with a locking primitive called reader-writer lock. For which there is a class called ReaderWriterLockSlim that can be used to ensure application-level thread-safety of the use of a queue collection.

Upvotes: 1

Jim Mischel
Jim Mischel

Reputation: 134005

This becomes a lot easier if you use a thread-safe queue that already has notification and such. The BlockingCollection makes writing producer-consumer stuff really easy.

I like your "linked consumer" idea because you don't have to modify the producer in order to use it. That is, the producer just stuffs things in a queue. How the consumer ultimately uses it is irrelevant. Using BlockingCollection, then, you'd have:

BlockingCollection<ItemType> inputQueue = new BlockingCollection<ItemType>();
BlockingCollection<List<ItemType>> intermediateQueue = new BlockingCollection<List<ItemType>>();

Your producer adds things to the input queue by calling inputQueue.Add. Your intermediate consumer (call it the consolidator) gets things from the queue by calling TryTake with a timeout. For example:

List<ItemType> items = new List<ItemType>();
while (!inputQueue.IsCompleted)
{
    ItemType t;
    while (inputQueue.TryTake(out t, TimeSpan.FromSeconds(5))
    {
        items.Add(t);
    }
    if (items.Count > 0)
    {
        // Add this list of items to the intermediate queue
        intermediateQueue.Add(items);
        items = new List<ItemType>();
    }
}

The second consumer just reads the intermediate queue:

foreach (var itemsList in intermediateQueue.GetConsumingEnumerable))
{
    // do something with the items list
}

No need for ManualResetEvent or lock or any of that; BlockingCollection handles all the messy concurrency stuff for you.

Upvotes: 7

chickenpie
chickenpie

Reputation: 133

To expand on the idea of @Chris, when you consume an id, remember what time it is. If more than 5 seconds has passed since the last one, then start a new list an set an event. Your Block Consumer simply waits on that event and consumes the stored list.

Also note that in your first solution, it is possible for ConsumerIdByBlock to exit the inner while just before OnConsumeId acquires the lock, then ConsumerIdByBlock would consume at least one Id too many.

Upvotes: 1

Related Questions