Eoin Campbell
Eoin Campbell

Reputation: 44308

How can I create a constant Processing "Flow" using the TPL in C# 4

I'm not sure if the following is possible but I'd like to Invoke a number of Actions, in Paralell, in a throttled manner, but keep the flow of processing continuous, without reverting back to using timers or loop/sleep cycles.

So far I've gotten it working that it loads a large batch of inputs from some source... and then processes them in paralell in a controlled way & loops around like below.

static void Main(string[] args)
{
    while(true) //Simulate a Timer Elapsing...
    {
        IEnumerable<int> inputs = new List<int>() {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};  
        //Simulate querying database queue tables for next batch of entries

        RunAllActions(inputs, 3); //Max 3 at a time.
    }
}

static void RunAllActions(IEnumerable<int> inputs, int maxConcurrency)
{
    var options = new ParallelOptions() {MaxDegreeOfParallelism = maxConcurrency};

    Parallel.ForEach<int>(inputs, options, DoWork);
    //Blocks here until all inputs are processed.
    Console.WriteLine("Batch of Work Done!!!");
}

static void DoWork(int input)
{
    Console.WriteLine("Starting Task {0}", input);
    System.Threading.Thread.Sleep(3000);
    Console.WriteLine("Finishing Task {0}", input);
}

what I'd like to know is, Is there a construct in the TPL that I could use to keep it always running... so that I can replace the "Timer Elapsing" & "Database Polling" with a MessageQueue Receieved event.

The following is rough version of what I'd like to Achieve... there's otherways I can go about it, but I want to know is this sort of Pattern built into the TPL.

internal class Engine
{
    private MessageQueue mq;
    private Queue<int> myInternalApplicationQueue;

    public Engine()
    {
        //Message Queue to get new task inputs from
        mq = new MessageQueue();
        mq.ReceiveCompleted += new ReceiveCompletedEventHandler(mq_ReceiveCompleted);

        // internal Queue to put them in.
        myInternalApplicationQueue = new Queue<int>();
    }

    void mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
    {
        //On MQ Receive, pop the input in a queue in my app
        int input = (int) e.Message.Body;

        myInternalApplicationQueue.Enqueue(input);
    }

    public void StartWorking()
    {
        //Once this gets called, it doesn't stop... it just keeps processing/watching that queue
        //processing the tasks as fast as it's allowed while the app is running.
        var options = new ParallelOptions() { MaxDegreeOfParallelism = 3 };
        Parallel.KeepWorkingOnQueue<int>(myInternalApplicationQueue, options, DoWork);
        //       ^^^^^^^^^^^^^^^^^^ <----- THIS GUY
    }

}

Upvotes: 5

Views: 2501

Answers (2)

Drew Marsh
Drew Marsh

Reputation: 33379

As Reed points out, BlockingCollection is a good "manual" way to go here. The downside is you have to also manage the consumers yourself.

Another approach that you might want to look into that takes a lot of the coordination work out of your hands for scenarios like this is to look into TPL Dataflow. Specifically in a scenario like this you can just use an ActionBlock<T> and, when the message comes in from the queue you would just Post the new piece of data to the ActionBlock<T> and it will automatically process it using TPL worker threads under the covers. That would make your Engine class look a little something like this:

ActionBlock<int> myActionBlock = new ActionBlock<int>(this.ProcessWorkItem);

void mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)      
{      
    int input = (int)e.Message.Body;

    // Post the data to the action block
    this.myActionBlock.Post(input);
}

private void ProcessWorkItem(int workItemData)
{
    // ActionBlock will hand each work item to this method for processing
}

Now, as far as controlling parallelism or capacity, you can easy control these specifics of the ActionBlock<T> by passing in an ExecutionDataflowBlockOptions when constructing the ActionBlock<T>. So let's say I want to make sure that I never have a parallelism of more than four and block the producer from adding more than one hundred items to the queue. I would just do:

ActionBlock<int> myActionBlock = new ActionBlock<int>(
                                     this.ProcessWorkItem, 
                                     new ExecutionDataflowBlockOptions
                                     {
                                         MaxDegreeOfParallelism = 4,
                                         BoundedCapacity = 100
                                     });

Upvotes: 2

Reed Copsey
Reed Copsey

Reputation: 564691

You can use BlockingCollection<T> to handle this type of operation, which is effectively a producer/consumer scenario.

Basically, you'd setup a BlockingCollection<T> and use it as your "producer". You would then have three (or any number) of consumer tasks (which are often setup as long running tasks) that process elements (by calling blockingCollection.GetConsumingEnumerable() in a standard foreach loop).

You then add items as needed to the collection, and they will continually be processed. When you are completely done, you'd call BlockingCollection<T>.CompleteAdding, which will cause the foreach loops to complete, and the entire thing stops.

As a side note - you typically do not want to use Parallel.ForEach on the GetConsumingEnumerable() from a BlockingCollection<T> - at least not unless you handle the partitioning yourself. It's typically better to use multiple tasks and have each iterate sequentially. The reason is that the default partitioning scheme in Parallel.ForEach will cause problems (it waits until a "chunk" of data is available, instead of immediately processing items, and the "chunks" get larger and larger over time).

Upvotes: 6

Related Questions