bommina
bommina

Reputation: 317

Channels: Is it possible to broadcast/receive same message by multiple consumers from single producer?

Is it possible to receive same message by multiple consumers. I have a single producer which produces Tick data(stock market) from web sockets. I have single consumer now receives 1000 messages per second, it works great. But now I would like to have multiple consumers to receive same message using System.Threading.Channels. Complete working code for single producer/consumer.

class ConsumerOne
{
    private readonly Channel<DummyData> _tickQueue;
    private readonly CancellationTokenSource _cancellationTokenSource;
    private readonly string _tag;

    public ConsumerOne(Channel<DummyData> tickQueue, CancellationTokenSource cancellationTokenSource, string tag)
    {
        _tickQueue = tickQueue;
        _cancellationTokenSource = cancellationTokenSource;
        _tag = tag;
    }

    public async Task StartConsuming()
    {
        await foreach (var message in _tickQueue.Reader.ReadAllAsync(
                           cancellationToken: _cancellationTokenSource.Token))
        {
            // Business logic of One
            Console.WriteLine($"from consumer {_tag} ==> {message.Price}");
        }
    }
}

public class DummyData
{
    public long Ticks { get; set; }
    public DateTime DateTime { get; set; }
    public decimal Price { get; set; }
}

class Producer
{
    private readonly Channel<DummyData> _tickQueue;
    private readonly CancellationTokenSource _cancellationTokenSource;

    public Producer(Channel<DummyData> tickQueue, CancellationTokenSource cancellationTokenSource)
    {
        _tickQueue = tickQueue;
        _cancellationTokenSource = cancellationTokenSource;
    }

    public async Task StartProducing()
    {
        Random r = new Random();
        for (int i = 0; i < 10; i++)
        {

            await _tickQueue.Writer.WriteAsync(new DummyData()
            {
                DateTime = DateTime.Now,
                Ticks = DateTime.Now.Ticks,
                Price = Convert.ToDecimal(r.NextDouble() * r.Next(100, 105))
            }, _cancellationTokenSource.Token);
            await Task.Delay(r.Next(50, 500));
        }
    }
}

internal class MultipleConsumersEg
{
    private static Channel<DummyData> tickQueue;
    private static readonly CancellationTokenSource TickQueueCancellationTokenSource = new CancellationTokenSource();
    public static async Task Main(string[] args)
    {
        tickQueue = Channel.CreateUnbounded<DummyData>();

        Producer p = new Producer(tickQueue, TickQueueCancellationTokenSource);
        ConsumerOne consumerOne = new ConsumerOne(tickQueue, TickQueueCancellationTokenSource, "ONE");
         consumerOne.StartConsuming();

         p.StartProducing();

        Console.ReadLine();
    }
}

Above code snippets works for single producer/consumer, fiddle link. Now I would like to have another Consumer for different strategy (each consumer for one strategy).

class ConsumerTwo
{
    private readonly Channel<DummyData> _tickQueue;
    private readonly CancellationTokenSource _cancellationTokenSource;
    private readonly string _tag;

    public ConsumerTwo(Channel<DummyData> tickQueue, CancellationTokenSource cancellationTokenSource, string tag)
    {
        _tickQueue = tickQueue;
        _cancellationTokenSource = cancellationTokenSource;
        _tag = tag;
    }

    public async Task StartConsuming()
    {
        await foreach (var message in _tickQueue.Reader.ReadAllAsync(
                           cancellationToken: _cancellationTokenSource.Token))
        {
            // Business logic of Two
            Console.WriteLine($"from consumer {_tag} ==> {message.Price}");
        }
    }
}

    public static async Task Main(string[] args)
    {
        tickQueue = Channel.CreateUnbounded<DummyData>();

        Producer p = new Producer(tickQueue, TickQueueCancellationTokenSource);
        ConsumerOne consumerOne = new ConsumerOne(tickQueue, TickQueueCancellationTokenSource, "ONE");
         consumerOne.StartConsuming();

        ConsumerTwo consumerTwo = new ConsumerTwo(tickQueue, TickQueueCancellationTokenSource, "TWO");
         consumerTwo.StartConsuming();

         p.StartProducing();

        Console.ReadLine();
    }

After adding 2nd consumer it consumes data, but same data can't seen by two consumers. Here I want all consumers to receive all 10 messages. Considering at max I may have 50 consumers in future, all should receive same message.

Output:

    from consumer TWO ==> 7.27597006121753
    from consumer TWO ==> 30.4838315240171
    from consumer TWO ==> 31.3675707908867
    from consumer TWO ==> 53.2673930636206
    from consumer ONE ==> 74.6396192795487
    from consumer TWO ==> 24.2795471970634
    from consumer ONE ==> 88.6467375550418
    from consumer ONE ==> 26.3311568478758
    from consumer TWO ==> 20.8731819843862
    from consumer ONE ==> 0.85598795659704

All messages should receive both the consumers.

Upvotes: 4

Views: 2779

Answers (1)

Panagiotis Kanavos
Panagiotis Kanavos

Reputation: 131423

Channels are a low level asynchronous publisher/subscriber, order and operation preserving queue. They provide the low level communication/queueing functionality needed to create Communicting Sequential Process pipelines. Other libraries seem more elegant simply because they added their own code on top of their own pub/sub queues.

The name is significant. Channels implement the communication "channel" between processes/workers or publishers/subscribers if you prefer. They're meant to be the input and output of every pipeline function or object, not just an internal collection. When used this way, it's easy to implement some very complex behavior. The channels themselves are typically owned by the workers, they aren't some kind of global program state.

Your question isn't strictly asking for broadcasting or multicasting. Writing a broadcast function though, is pretty simple. It could be something as simple (or simplistic) as:

public static async Task CopyTo<T>(this ChannelReader<T> input,
        IList<ChannelWriter<T>> outputs, 
        CancellationToken token=default)
{
    await foreach(var msg in input.ReadAllAync(token).ConfigureAwait(false))
    {
        foreach(var o in outputs)
        {
            await o.WriteAsync(msg);
        }
    }
    foreach(var o in outputs)
    {
        o.TryComplete();
    }
}

This copies the same message everywhere. It won't block as long as the output channels are unbounded or at least with large enough capacities to avoid filling up.

It's also easy to create a RouteTo method that would route messages by tag, eg

public static async Task RouteTo<T>(this ChannelReader<T> input,
        IDictionary<string,ChannelWriter<T>> outputs, 
        Func<T,string> selector, 
        CancellationToken token=default)
{
    await foreach(var msg in input.ReadAllAync(token).ConfigureAwait(false))
    {
        var key=selector(msg);
        if (outputs.TryGetValue(key, out var o)
        {
            await o.WriteAsync(msg);
        }
    }
    foreach(var o in outputs.Values)
    {
        o.TryComplete();
    }
}

Error handling, cancellation and awaiting would have to be adjusted according to the applications requirements. For example, the loop means that if one channel is a bounded channel at capacity, other channels would have to wait. This can be avoided if all write tasks are collected and awaited with Task.WhenAll

await Task.WhenAll(outputs.Select(o=>o.WriteAsync(msg)));

Let's say the producer is a FIX listener class, each message it receives should be published through a ChannelReader<> output property:

public class FixProducer
{
   Channel<DummyData> _channel;

   public ChannelReader<DummyData> Output=>_channel.Reader;

   SomeFIXEngine _engine;
   public FixPublisher(SomeFIXEngine engine)
   {
       _engine=engine;
       _channel=Channel.CreateUnbounded<DummyData>();
   }

   public async Task StartProducing(CancellationToken token=default)
   {
       var writer=_channel.Writer;
       for (...)
       {
           if(token.IsCancellationRequested)
           {
               break;
           }
           var data=_engine.GetSomeData();
           await _writer.WriteAsync(data);
       }
       writer.Complete();
   }
}

The consumers can receive their input through their own ChannelWriter Input properties:

interface IConsumer<T>
{
    ChannelWriter<T> Input {get;}
}

class ConsumerOne:IConsumer<DummyData>
{
    private readonly Channel<DummyData> _input;
    
    public ChannelWriter<DummyData> Input=>_input.Writer;

    public ConsumerOne(...)
    {
        _input=Channel.CreateUnbounderd<DummyData>();        
    }

    public async Task StartConsuming(CancellationToken token=default)
    {
        await foreach (var message in _input.Reader.ReadAllAsync(token).ConfigureAwait(false))
        {
            ...
        }
    }
}

CopyTo can now be used to copy FIX messages to all consumers:

var producer=new FixProducer(...);
var consumerOne=new ConsumerOne(...);
var consumerTwo=new ConsumerTwo(...);
...

var copyTask=producer.Output.CopyTo(new[]{consumerOne.Input,consumerTwo.Input});

producer.StartProducing(...);
consumerOne.StartConsuming(...);
...

Now that channels are owned by the consumers, there's no need to have a public StartConsuming method, it could be called in the constructor.

class ConsumerOne:IConsumer<DummyData>
{
    private readonly Channel<DummyData> _input;
    Task _consumeTask;
    public ChannelWriter<DummyData> Input=>_input.Writer;

    public ConsumerOne(...,CancellationToken token=default)
    {
        _input=Channel.CreateUnbounderd<DummyData>();        
        _consumeTask=StartConsuming(token);
    }

    async Task StartConsuming(CancellationToken token=default)
    {
        await foreach (var message in _input.Reader.ReadAllAsync(token).ConfigureAwait(false))
        {
            ...
        }
    }
    ...
}

The consumer task will keep running until the upstream producer calls Complete() on the input ChannelWriter.

Upvotes: 8

Related Questions