Reputation: 1765
maybe this functionality is buried somewhere in the .NET-Framework already but I couldn't find it. I need to execute methods in a given order, one after one. The methods should return something (e.g. object), so there is a way to react to the returned data (e.g. cancel the execution of the following methods because an error occurred). The execution of the methods should run in its own thread and I should be able to add methods to the queue at any time. Any idea how to implement this in c#?
Thanks to Jon's Comment I tried to implement such a Queue on me own. This might be completly wrong - so comments are VERY welcome ;-)
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace FunctionQueue
{
public class ErrorResult
{
public string ErrorMessage { get; set; }
public bool CancelQueue { get; set; }
}
public class FunctionQueue
{
private readonly ConcurrentQueue<Func<object>> Queue;
private bool isExecuting;
private bool isCanceled;
private readonly Action<ErrorResult> onError;
private readonly Action onComplete;
public FunctionQueue(Action<ErrorResult> onError = null, Action onComplete = null)
{
this.Queue = new ConcurrentQueue<Func<object>>();
this.onError = onError;
this.onComplete = onComplete;
}
public void AddFunctionToQueue( Func<object> functionToAdd, bool startQueue = false )
{
this.Queue.Enqueue(functionToAdd);
if (startQueue && !this.isExecuting) this.ProcessQueue();
}
public void ProcessQueue()
{
if( this.Queue.Count > 0 )
{
Task.Run( () =>
{
this.isCanceled = false;
this.isExecuting = true;
Func<object> functionToExecuteNext;
while( !isCanceled && this.Queue.TryDequeue( out functionToExecuteNext ) )
{
object result = functionToExecuteNext();
ErrorResult errorResult = result as ErrorResult;
if( errorResult != null )
{
if( this.onError != null ) this.onError( errorResult );
if( errorResult.CancelQueue ) this.isCanceled = true;
}
}
} );
}
this.isExecuting = false;
if( this.onComplete != null ) this.onComplete();
}
}
}
I would like to add another Feature but unfortunately I have no idea how to implement this: I would like to add an optional callback to every added function. That callback should be called when this given function completed. How can I add this feature?
Upvotes: 0
Views: 354
Reputation: 134005
You shouldn't be using ConcurrentQueue<T>
. Use BlockingCollection<T>
, which gives a much nicer wrapper that you can work with.
You can then process the queue with:
public void ProcessQueue()
{
Func<object> functionToExecuteNext;
while (!IsCancelled && queue.TryTake(out functionToExecuteNext, Timeout.Infinite))
{
// execute the function
}
}
And in your program's initialization:
queue = new BlockingCollection<Func<object>>();
var t = new Task(ProcessQueue, TaskCreationOptions.LongRunning);
t.Start();
// program does stuff
// time to shut down
queue.CompleteAdding(); // mark the queue as complete for adding
// at this point you'll want to wait until the queue is empty.
// unless you want to quit immediately. Then just set the IsCancelled flag.
Here, TryTake
does a non-busy wait on the queue. It will wait until an item is available, or until the queue is marked as complete. The key is that it doesn't consume CPU resources while waiting.
When the producer is done adding things to the queue, it calls queue.CompleteAdding
, which will make TryTake
exit with false
when the queue is empty.
So you can just start the queue processing thread at the start of your program, and it will process items as they come in. There's no need for the isExecuting
flag, or for the AddFunctionToQueue
method to check the flag and start the thread.
I used the Task
constructor to create the task with the LongRunning
option so that the scheduler can do a better job of scheduling other work.
Upvotes: 0
Reputation: 1500835
Sounds like you could just use a producer/consumer queue of Func<object>
.
Assuming you're using .NET 4, you should use a BlockingCollection<T>
wrapper around an appropriate IProducerConsumerCollection<T>
(e.g. ConcurrentQueue<T>
). These types are designed to help make producer/consumer situations easy.
You should also look at Dataflow which provides some higher-level constructs on top of this, if you need to build a pipeline for example.
Upvotes: 4