hakanviv
hakanviv

Reputation: 253

Throttling Message Queue Consumption While Using Parallellism

I was consuming messages from a message queue and processing them in parallel using Task.Run(). But I want to throttle the speed of consumption to a certain maximum number of threads and not consume from the message queue until the thread count drops below that.

Let's say I want max 100 threads. In that case, when a 100 threads is reached, it should stop consuming from the message queue. When a message processing task is completed and the number of threads drops to 99, it should consume one more message from the queue.

I tried to use TransformBlock for this purpose and here is a sample code for demonstration purposes:

public partial class MainWindow : Window
    {
        object syncObj = new object();
        int i = 0;
        public MainWindow()
        {
            InitializeComponent();
        }


        private async Task<bool> ProcessMessage(string message)
        {
            await Task.Delay(5000);

            lock (syncObj)
            {
                i++;
                System.Diagnostics.Debug.WriteLine(i);
            }
            return true;
        }

        private async void Button_Click(object sender, RoutedEventArgs e)
        {
            var processor = new TransformBlock<string, bool>(
                    (str) => ProcessMessage(str),
                    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 }
                    );

            for(int i = 0; i < 1000; i++)
            {
                await processor.SendAsync("a");
            }


    }
}

Throttling the number of parallel tasks works as expected but all messages are sent to the TransformBlock at once, so SendAsync loop ends before the tasks are processed.

I want it to continue accepting messages as long as the number of threads is below the max. allowed parallelism but wait when 100 is reached.

Is there a way to do this using TransformBlock or should I resort to some other method?

Upvotes: 0

Views: 575

Answers (1)

Stephen Cleary
Stephen Cleary

Reputation: 456322

Dataflow blocks have input buffers. This input buffer acts as a queue.

If you want to keep the messages in your own queue, you can do something close to what you want by restricting the number of items the dataflow block is willing to receive:

var processor = new TransformBlock<string, bool>(
    (str) => ProcessMessage(str),
    new ExecutionDataflowBlockOptions
    {
      BoundedCapacity = 100,
      MaxDegreeOfParallelism = 100,
    }
);

Note that BoundedCapacity includes the items being processed by the block. Since BoundedCapacity == MaxDegreeOfParallelism, this essentially turns off the dataflow block's queue.

so SendAsync loop ends before the tasks are processed.

It would still end when there are (up to) 100 tasks to be processed. If you want to wait until all items have finished processing, then call Complete() and await Completed.

Upvotes: 1

Related Questions