Jesse Carter
Jesse Carter

Reputation: 21147

Reactive Framework Trouble with MultiCast and Subscribing to Subject

I've just started learning how to use the Reactive Framework and am struggling with being able to multicast publish to more than one subscriber.

I had everything working fine like this:

m_MessagePublisher = m_ServerClient.MessageQueue
      .GetConsumingEnumerable()
      .ToObservable(TaskPoolScheduler.Default);

var genericServerMessageSubscriber = m_MessagePublisher
      .Where(message => message is GenericServerMessage)
      .Subscribe(message =>
      {
          // do something here
      }

But then I realized that this didn't support multicast and when I tried to attach another subscriber that should have been hit by the same message it won't fire. I have been reading into the .MultiCast extension and trying to figure out how Subject plays into this but haven't been able to get it to work yet:

var subject = new Subject<BesiegedMessage>();

var messagePublisher = m_ServerClient.MessageQueue
      .GetConsumingEnumerable()
      .ToObservable(TaskPoolScheduler.Default)
      .Multicast(subject);

// All generic server messages are handled here
var genericServerMessageSubscriber = subject
      .Where(message => message is GenericServerMessage)
      .Subscribe(message =>
      {
            // do something here
      }

But now none of the subscribers are being hit including the single one that was working fine before. What am I missing here in order to be able to properly multicast to more than one subscriber?

UPDATE: Using Subscribe(subject) instead of Multicast(subject) seems to be working to multicast which leaves me very confused as to what .MultiCast() is for

Upvotes: 4

Views: 874

Answers (1)

JerKimball
JerKimball

Reputation: 16894

EDIT:

Haha - serves me right for reading too fast - what you are asking is WAY simpler...that said, I think the below is important, so I'm leaving it...So, your problem - try adding this line:

var messagePublisher = m_ServerClient.MessageQueue
  .GetConsumingEnumerable()
  .ToObservable(TaskPoolScheduler.Default)
  .Multicast(subject)
  // Here: connectable observables are a PITA...
  .RefCount();

END EDIT:

Hmm...how to describe Multicast...I guess let's go by example:

Say you've got something like this - what do you think it produces?

int delay = 100;
var source = Observable.Interval(TimeSpan.FromMilliseconds(delay));
var publishingFrontend = new Subject<string>();

// Here's "raw"
var rawStream = source;
using(rawStream.Subscribe(x => Console.WriteLine("{0}", x)))
{
    Thread.Sleep(delay * 3);
    using(rawStream.Subscribe(x => Console.WriteLine("Inner: {0}", x)))
    {
        Thread.Sleep(delay * 3);
    }
    Thread.Sleep(delay * 5);
}

Since you are subscribing to the stream raw, new subscribers basically start from scratch:

(this won't 100% match if you re-run, since I took the wussy way out by Thread.Sleep, but should be close)

0
1
2
Inner: 0
3
Inner: 1
4
5
6
7
8
9

Hmm...so if we want to "tie in mid-stream", we use the Publish().RefCount() pattern:

var singleSource = source.Publish().RefCount();
using(singleSource.Subscribe(x => Console.WriteLine("{0}", x)))
{
    Thread.Sleep(delay * 3);
    using(singleSource.Subscribe(x => Console.WriteLine("Inner: {0}", x)))
    {
        Thread.Sleep(delay * 3);
    }
    Thread.Sleep(delay * 5);
}

Which produces something like:

0
1
2
Inner: 2
3
Inner: 3
4
Inner: 4
5
6
7
8
9

So let's say we didn't have the Publish() operator - how could we simulate it?

Console.WriteLine("Simulated Publish:");
// use a subject to proxy values...
var innerSubject = new Subject<long>();
// wire up the source to "write to" the subject
var innerSub = source.Subscribe(innerSubject);
var simulatedSingleSource = Observable.Create<long>(obs =>
{
    // return subscriptions to the "proxied" subject
    var publishPoint = innerSubject.Subscribe(obs);        
    return publishPoint;
});

Running this, we get:

Simulated Publish:
0
1
2
Inner: 2
3
Inner: 3
4
Inner: 4
5
6
7
8
9

Woot!

but there is another way...

Console.WriteLine("MulticastPublish:");
var multicastPublish = source.Multicast(new Subject<long>()).RefCount();    
using(multicastPublish.Subscribe(x => Console.WriteLine("{0}", x)))
{
    Thread.Sleep(delay * 3);
    using(multicastPublish.Subscribe(x => Console.WriteLine("Inner: {0}", x)))
    {
        Thread.Sleep(delay * 3);
    }
    Thread.Sleep(delay * 5);
}

Output:

MulticastPublish:
0
1
2
Inner: 2
3
Inner: 3
4
Inner: 4
5
6
7
8
9

EDIT:

In fact, all of the ConnectableObservable generating extensions rely on a Multicast/Subject pairing:

Publish() => Multicast(new Subject<T>)
Replay() => Multicast(new ReplaySubject<T>)
PublishLast() => Multicast(new AsyncSubject<T>)

Upvotes: 2

Related Questions