sll
sll

Reputation: 62504

Analogue of Queue.Peek() for BlockingCollection when listening to consuming IEnumerable<T>

I'm using Pipelines pattern implementation to decouple messages consumer from a producer to avoid slow-consumer issue.

In case of any exception on a message processing stage [1] it will be lost and not dispatched to an other service/layer [2]. How can I handle such issue in [3] so message will not be lost and what is important! order of messages will not be mixed up so upper service/layer will get messages in the order they came in. I have an idea which involves an other intermediate Queue but it seems complex? Unfortunately BlockingCollection<T> does not expose any analogue of Queue.Peek() method so I can just read next available message and in case of successfull processing do Dequeue()

private BlockingCollection<IMessage> messagesQueue;    

// TPL Task does following:
// Listen to new messages and as soon as any comes in - process it
foreach (var cachedMessage in 
             messagesQueue.GetConsumingEnumerable(cancellation))
{    
    const int maxRetries = 3;
    int retriesCounter = 0;
    bool isSent = false;

    // On this point a message already is removed from messagesQueue
    while (!isSent && retriesCounter++ <= maxRetries)
    {
        try
        {
           // [1] Preprocess a message
           // [2] Dispatch to an other service/layer    
           clientProxyCallback.SendMessage(cachedMessage);
           isSent = true;
        }                                
        catch(Exception exception)
        {
           // [3]   
           // logging
           if (!isSent && retriesCounter < maxRetries)
           {
              Thread.Sleep(NSeconds);
           }
        }            
    
        if (!isSent && retriesCounter == maxRetries)
        {
           // just log, message is lost on this stage!
        }
    }
}

EDIT: Forgot to say this is IIS hosted WCF service which dispatches messages back to Silverlight client WCF Proxy via client callback contract.

EDIT2: Below is how I would do this using Peek(), Am I missing something?

bool successfullySent = true;
try
{
   var item = queue.Peek();
   PreProcessItem(item);
   SendItem(item);       
}
catch(Exception exception)
{
   successfullySent = false;
}
finally
{
   if (successfullySent)
   {
       // just remove already sent item from the queue
       queue.Dequeue();
   }
}

EDIT3: Surely I can use old style approach using while loop, bool flag, Queue and AutoResetEvent, but I just wondering whether the same is possible using BlockingCollection and GetConsumingEnumerable() I think facility like Peek would be very helpful when using together with consuming enumerable, since otherwise all Pipeline pattern implementation examples new stuff like BlockingCollection and GetConsumingEnumerable() looks not durable and I have to move back to the old approach.

Upvotes: 12

Views: 7905

Answers (4)

Evgeniy Berezovsky
Evgeniy Berezovsky

Reputation: 19218

BlockingCollection<T> is a wrapper around IProducerConsumerCollection<T>, which is more generic than e.g. ConcurrentQueue and gives the implementer the freedom of not having to implement a (Try)Peek method.

However, you can always call TryPeek on the underlying queue directly:

ConcurrentQueue<T> useOnlyForPeeking = new ConcurrentQueue<T>();
BlockingCollection<T> blockingCollection = new BlockingCollection<T>(useOnlyForPeeking);
...
useOnlyForPeeking.TryPeek(...)

Note however that you must not modify your queue via useOnlyForPeeking, otherwise blockingCollection will get confused and may throw InvalidOperationExceptions at you, but I'd be surprised if calling the non-modifying TryPeek on this concurrent data structure would be an issue.

Upvotes: 4

maximpa
maximpa

Reputation: 1988

You could use ConcurrentQueue<T> instead, it has TryDequeue() method.

ConcurrentQueue<T>.TryDequeue(out T result) tries to remove and return the object at the beginning of the concurrent queue, it returns true if an element was removed and returned from the beginning of the ConcurrentQueue successfully.

So, no need to check a Peek first.

TryDequeue() is thread safe:

ConcurrentQueue<T> handles all synchronization internally. If two threads call TryDequeue(T) at precisely the same moment, neither operation is blocked.

As far as I understand it returns false only if the queue is empty:

If the queue was populated with code such as q.Enqueue("a"); q.Enqueue("b"); q.Enqueue("c"); and two threads concurrently try to dequeue an element, one thread will dequeue a and the other thread will dequeue b. Both calls to TryDequeue(T) will return true, because they were both able to dequeue an element. If each thread goes back to dequeue an additional element, one of the threads will dequeue c and return true, whereas the other thread will find the queue empty and will return false.

http://msdn.microsoft.com/en-us/library/dd287208%28v=vs.100%29.aspx

UPDATE

Perhaps, the easiest option would be using TaskScheduler Class. With it you can wrap all your processing tasks into the queue's items and simplify the implementation of synchronisation.

Upvotes: 0

Jodrell
Jodrell

Reputation: 35716

As Dennis says in his comment, BlockingCollection<T> provides a blocking wrapper to any implementor of the IProducerConsumerCollection<T> interface.

As you can see, IProducerConsumerCollection<T>, by design, does not define a Peek<T> or other methods necessary to implement one. This means that BlockingCollection<T> cannot, as it stands, offer an analouge to Peek.

If you consider, this greately reduces the concurrencey problems created by the utility trade off of a Peek implementation. How can you consume without consuming? To Peek concurrently you would have to lock the head of the collection until the Peek operation was completed which I and the designers of BlockingCollection<T> view as sub-optimal. I think it would also be messy and difficult to implement, requiring some sort of disposable peek context.

If you consume a message and its consumption fails you will have to handle with it. You could add it to another failures queue, re-add it to the normal processing queue for a furture retry or just log its failure for posterity or, some other action appropriate to your context.

If you don't want to consume the messages concurrently then there is no need to use BlockingCollection<T> since you don't need concurrent consumption. You could use ConcurrentQueue<T> directly, you'll still get synchronicity off adds, and you can use TryPeek<T> safely since you control a single consumer. If consumption fails you could stop consumption with a infinite retry loop in you desire although, I suggest this requires some design thought.

Upvotes: 4

Dennis
Dennis

Reputation: 37770

You should consider intermediate queue.

BlockingCollection<T> can't "peek" items because of its nature - there can be more than one consumer. One of them can peek an item, and another one can take it - hence, the first one will try to take item, that already has been taken.

Upvotes: 9

Related Questions