Reputation: 275
I need to implement a sort of task buffer. Basic requirements are:
I was thinking of implementing it using a Queue like below. Would appreciate feedback on the implementation. Are there any other brighter ideas to implement such a thing?
public class TestBuffer
{
private readonly object queueLock = new object();
private Queue<Task> queue = new Queue<Task>();
private bool running = false;
public TestBuffer()
{
}
public void start()
{
Thread t = new Thread(new ThreadStart(run));
t.Start();
}
private void run()
{
running = true;
bool run = true;
while(run)
{
Task task = null;
// Lock queue before doing anything
lock (queueLock)
{
// If the queue is currently empty and it is still running
// we need to wait until we're told something changed
if (queue.Count == 0 && running)
{
Monitor.Wait(queueLock);
}
// Check there is something in the queue
// Note - there might not be anything in the queue if we were waiting for something to change and the queue was stopped
if (queue.Count > 0)
{
task = queue.Dequeue();
}
}
// If something was dequeued, handle it
if (task != null)
{
handle(task);
}
// Lock the queue again and check whether we need to run again
// Note - Make sure we drain the queue even if we are told to stop before it is emtpy
lock (queueLock)
{
run = queue.Count > 0 || running;
}
}
}
public void enqueue(Task toEnqueue)
{
lock (queueLock)
{
queue.Enqueue(toEnqueue);
Monitor.PulseAll(queueLock);
}
}
public void stop()
{
lock (queueLock)
{
running = false;
Monitor.PulseAll(queueLock);
}
}
public void handle(Task dequeued)
{
dequeued.execute();
}
}
Upvotes: 13
Views: 23752
Reputation: 3717
Look at my lightweight implementation of threadsafe FIFO queue, its a non-blocking synchronisation tool that uses threadpool - better than create own threads in most cases, and than using blocking sync tools as locks and mutexes. https://github.com/Gentlee/SerialQueue
Usage:
var queue = new SerialQueue();
var result = await queue.Enqueue(() => /* code to synchronize */);
Upvotes: 1
Reputation: 1005
You should think about ConcurrentQueue, which is FIFO, in fact. If not suitable, try some of its relatives in Thread-Safe Collections. By using these you can avoid some risks.
Upvotes: 7
Reputation: 4450
You could use Rx on .NET 3.5 for this. It might have never come out of RC, but I believe it is stable* and in use by many production systems. If you don't need Subject you might find primitives (like concurrent collections) for .NET 3.5 you can use that didn't ship with the .NET Framework until 4.0.
Alternative to Rx (Reactive Extensions) for .net 3.5
* - Nit picker's corner: Except for maybe advanced time windowing, which is out of scope, but buffers (by count and time), ordering, and schedulers are all stable.
Upvotes: 0
Reputation: 120480
I suggest you take a look at TPL DataFlow. BufferBlock is what you're looking for, but it offers so much more.
Upvotes: 0
Reputation: 150118
You can actually handle this with the out-of-the-box BlockingCollection.
It is designed to have 1 or more producers, and 1 or more consumers. In your case, you would have multiple producers and one consumer.
When you receive a stop signal, have that signal handler
The consumer thread will continue to run until all queued items are removed and processed, then it will encounter the condition that the BlockingCollection is complete. When the thread encounters that condition, it just exits.
Upvotes: 9