nop
nop

Reputation: 6311

Rx - Multiple consumers without losing messages

I want multiple consumers and each of them should get all the messages (without missing any of them).

The actual is idea is that I have a firehose of stuff coming through a redis pub/sub and I need to distribute it to a number of websocket connections, so basically whenever a message comes from redis, it needs to be distributed through all websockets connections.

How do I achieve that with System.Reactive? Will the following work?

var obs1 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
    var publishVal = x;
    Console.WriteLine($@"observer1 publishing {publishVal}");
    return publishVal;
}).Publish();

var sub1 = obs1.Subscribe(x => Console.WriteLine($@"subscriber1 value {x}"));
var sub2 = obs1.Subscribe(x => Console.WriteLine($@"subscriber2 value {x}"));

obs1.Connect();

Upvotes: 0

Views: 100

Answers (1)

Enigmativity
Enigmativity

Reputation: 117027

From what you've described, this will work:

var obs1 =
    Observable
        .Interval(TimeSpan.FromMilliseconds(500))
        .Do(x => Console.WriteLine($@"observer1 publishing {x}"));

var sub1 = obs1.Subscribe(x => Console.WriteLine($@"subscriber1 value {x}"));
var sub2 = obs1.Subscribe(x => Console.WriteLine($@"subscriber2 value {x}"));

Upvotes: 1

Related Questions