Reputation: 321
I am building a multithreaded C# application in which multiple threads contribute to elements on a queue. A single thread is consuming the elements on that same queue. I would like the single thread to do some reduction/merging of elements of the incoming elements, so ideally it would look at all new elements on the queue, reduce them and then process the entries after reduction. A bit like this:
while (true)
{
Collection<Elem> elements = queue.TakeAll();
Collection<Elem> reducedElements = Reduce(elements);
for (Elem e in reducedElements)
{
process(e);
}
}
But there obviously isn't any TakeAll() method. From Java experience I am used to the BlockingQueue's drainTo method which offers something like what I'm interested in.
I could implement something myself by just using TryTake until the queue is empty. But that has the risk that the producing threads may potentially be busy producing also and that would cause there to be no finite ending of the collection to reduce and process. I'm basically looking for a way to take everything out of the queue, leaving it empty but providing a collection that can be worked on.
Upvotes: 3
Views: 1820
Reputation: 2564
Take a look at the ConcurrentQueue
in namespace System.Collections.Concurrent.
This Queue is made for thread safe operations.
You can easy add an extension methode for your purpose.
public static class Extensions
{
public static List<T> DrainTo<T>(this System.Collections.Concurrent.ConcurrentQueue<T> poConcurrentQueue)
{
List<T> loList = new List<T>();
T loElement;
while (poConcurrentQueue.TryDequeue(out loElement))
loList.Add(loElement);
return loList;
}
}
and use like this:
System.Collections.Concurrent.ConcurrentQueue<string> loConcurrentQueue = new System.Collections.Concurrent.ConcurrentQueue<string>();
loConcurrentQueue.Enqueue("Element1");
loConcurrentQueue.Enqueue("Element2");
var loList = loConcurrentQueue.DrainTo();
Upvotes: 2