Reputation: 21147
I've just started learning how to use the Reactive Framework and am struggling with being able to multicast publish to more than one subscriber.
I had everything working fine like this:
m_MessagePublisher = m_ServerClient.MessageQueue
.GetConsumingEnumerable()
.ToObservable(TaskPoolScheduler.Default);
var genericServerMessageSubscriber = m_MessagePublisher
.Where(message => message is GenericServerMessage)
.Subscribe(message =>
{
// do something here
}
But then I realized that this didn't support multicast and when I tried to attach another subscriber that should have been hit by the same message it won't fire. I have been reading into the .MultiCast extension and trying to figure out how Subject plays into this but haven't been able to get it to work yet:
var subject = new Subject<BesiegedMessage>();
var messagePublisher = m_ServerClient.MessageQueue
.GetConsumingEnumerable()
.ToObservable(TaskPoolScheduler.Default)
.Multicast(subject);
// All generic server messages are handled here
var genericServerMessageSubscriber = subject
.Where(message => message is GenericServerMessage)
.Subscribe(message =>
{
// do something here
}
But now none of the subscribers are being hit including the single one that was working fine before. What am I missing here in order to be able to properly multicast to more than one subscriber?
UPDATE: Using Subscribe(subject) instead of Multicast(subject) seems to be working to multicast which leaves me very confused as to what .MultiCast() is for
Upvotes: 4
Views: 874
Reputation: 16894
EDIT:
Haha - serves me right for reading too fast - what you are asking is WAY simpler...that said, I think the below is important, so I'm leaving it...So, your problem - try adding this line:
var messagePublisher = m_ServerClient.MessageQueue
.GetConsumingEnumerable()
.ToObservable(TaskPoolScheduler.Default)
.Multicast(subject)
// Here: connectable observables are a PITA...
.RefCount();
END EDIT:
Hmm...how to describe Multicast
...I guess let's go by example:
Say you've got something like this - what do you think it produces?
int delay = 100;
var source = Observable.Interval(TimeSpan.FromMilliseconds(delay));
var publishingFrontend = new Subject<string>();
// Here's "raw"
var rawStream = source;
using(rawStream.Subscribe(x => Console.WriteLine("{0}", x)))
{
Thread.Sleep(delay * 3);
using(rawStream.Subscribe(x => Console.WriteLine("Inner: {0}", x)))
{
Thread.Sleep(delay * 3);
}
Thread.Sleep(delay * 5);
}
Since you are subscribing to the stream raw, new subscribers basically start from scratch:
(this won't 100% match if you re-run, since I took the wussy way out by Thread.Sleep
, but should be close)
0
1
2
Inner: 0
3
Inner: 1
4
5
6
7
8
9
Hmm...so if we want to "tie in mid-stream", we use the Publish().RefCount()
pattern:
var singleSource = source.Publish().RefCount();
using(singleSource.Subscribe(x => Console.WriteLine("{0}", x)))
{
Thread.Sleep(delay * 3);
using(singleSource.Subscribe(x => Console.WriteLine("Inner: {0}", x)))
{
Thread.Sleep(delay * 3);
}
Thread.Sleep(delay * 5);
}
Which produces something like:
0
1
2
Inner: 2
3
Inner: 3
4
Inner: 4
5
6
7
8
9
So let's say we didn't have the Publish()
operator - how could we simulate it?
Console.WriteLine("Simulated Publish:");
// use a subject to proxy values...
var innerSubject = new Subject<long>();
// wire up the source to "write to" the subject
var innerSub = source.Subscribe(innerSubject);
var simulatedSingleSource = Observable.Create<long>(obs =>
{
// return subscriptions to the "proxied" subject
var publishPoint = innerSubject.Subscribe(obs);
return publishPoint;
});
Running this, we get:
Simulated Publish:
0
1
2
Inner: 2
3
Inner: 3
4
Inner: 4
5
6
7
8
9
Woot!
but there is another way...
Console.WriteLine("MulticastPublish:");
var multicastPublish = source.Multicast(new Subject<long>()).RefCount();
using(multicastPublish.Subscribe(x => Console.WriteLine("{0}", x)))
{
Thread.Sleep(delay * 3);
using(multicastPublish.Subscribe(x => Console.WriteLine("Inner: {0}", x)))
{
Thread.Sleep(delay * 3);
}
Thread.Sleep(delay * 5);
}
Output:
MulticastPublish:
0
1
2
Inner: 2
3
Inner: 3
4
Inner: 4
5
6
7
8
9
EDIT:
In fact, all of the ConnectableObservable
generating extensions rely on a Multicast
/Subject
pairing:
Publish() => Multicast(new Subject<T>)
Replay() => Multicast(new ReplaySubject<T>)
PublishLast() => Multicast(new AsyncSubject<T>)
Upvotes: 2