Ryan
Ryan

Reputation: 81

TPL Dataflow Consumer to Process Multiple Items at a time

I have a requirement to iterate through a large list and for each item call a web service to get some data. However, I want to throttle the number of requests to the WS to say no more than 5 concurrent requests executing at any one time. All calls to the WS are made using async/await. I am using the TPL dataflow BufferBlock with a BoundedCapacity of 5. Everything is working properly but what I am noticing is that the consumer, which awaits the WS call, blocks the queue until its finished resulting in all the requests in the bufferblock being performed serially. Is it possible to have the consumer always processing 5 items off the queue at a time? Or do I need to set up multiple consumers or start looking into action blocks? So in a nutshell I want to seed the queue with 5 items. As one item is processed a sixth will take its place and so on so I always have 5 concurrent requests going until there are no more items to process.

I used this as my guide: Async Producer/Consumer Queue using Dataflow

Thanks for any help. Below is a simplified version of the code

//set up
BufferBlock<CustomObject> queue = new BufferBlock<CustomObject>(new DataflowBlockOptions { BoundedCapacity = 5 });
var producer = QueueValues(queue, values);
var consumer = ConsumeValues(queue);
await Task.WhenAll(producer, consumer, queue.Completion);
counter = await consumer;

//producer
function QueueValues(BufferBlock<CustomObject> queue, IList<CustomObject> values)
{

    foreach (CustomObject value in values)
    {
        await queue.SendAsync(value);
    }
    queue.Complete();
}


//consumer
function ConsumeValues(BufferBlock<CustomObject> queue)
{
    while (await queue.OutputAvailableAsync())
    {
        CustomObject value = await queue.ReceiveAsync();
            await CallWebServiceAsync(value);
    }
}

Upvotes: 1

Views: 2074

Answers (2)

i3arnon
i3arnon

Reputation: 116548

You should be using ActionBlock with MaxDegreeOfParallelism set to 5. You may also want to set a BoundedCapacity but that's for throttling the producer and not the consumer:

var block = new ActionBlock<CustomObject>(
    item => CallWebServiceAsync(item), 
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 5,
        BoundedCapacity = 1000
    });

foreach (CustomObject value in values)
{
    await block.SendAsync(value);
}
block.Complete();
await block.Completion;

Upvotes: 2

usr
usr

Reputation: 171178

Your use of TPL Dataflow is rather strange. Normally, you'd move the consumption and processing into the flow. Append a TransformBlock to call the webservice. Delete ConsumeValues.

ConsumeValues executes sequentially which is fundamentally not what you want.

Instead of BoundedCapacity I think you rather want MaxDegreeOfParallelism.

Upvotes: 2

Related Questions