Rob Graat
Rob Graat

Reputation: 33

Why are chains of IObservables resolved entirely for each subscription?

I have a question how chains of IObervables are being resolved. I thought that if multiple observers subscribe to the same IObservable the entire chain would be only resolved once if the stream emits a new item.

Instead it appears that the entire chain is run for each subscription.

If my entire program logic is made using IObservables and chaining them together, then it appears to me that the code is running the same methods with the same outcome unnessarily.

The code below was written in LINQPad:

{
    var subject = Observable.Range(1, 1);

    var observable1 = subject
        .Select(value =>
        {
            var nextValue = value + 2;
            Console.Write("\n Result of expensive computation: " + nextValue);
            return nextValue;
        });

    var observable2 = observable1
        .Select(value =>
        {
            var nextValue = 2 * value;
            Console.Write("\n Result of another expensive computation: " + nextValue);
            return nextValue;
        });

    observable2.Subscribe(_ => Console.Write("\n Data received on first subscription."));

    observable2.Subscribe(_ => Console.Write("\n Data received on second subscription."));
}

Result:

Result of expensive computation: 3

Result of another expensive computation: 6

Data received on first subscription.

Result of expensive computation: 3

Result of another expensive computation: 6

Data received on second subscription.

However I expected:

Result of expensive computation: 3

Result of another expensive computation: 6

Data received on first subscription.

Data received on second subscription.

Thanks for any answers.

Cheers, Rob

Upvotes: 3

Views: 305

Answers (1)

Shlomo
Shlomo

Reputation: 14350

First to answer your questions:

  • Yes it's intended behavior.
  • Yes it can be avoided.
  • Avoidance is not a bad idea.

As @supertopi said, you created a cold observable, which functions as you outlined. Turn it into a hot observable, and you would have your desired functionality.

You can do so as follows:

{
    var subject = Observable.Range(1, 1);

    var observable1 = subject
        .Select(value =>
        {
            var nextValue = value + 2;
            Console.Write("\n Result of expensive computation: " + nextValue);
            return nextValue;
        });

    var observable2 = observable1
        .Select(value =>
        {
            var nextValue = 2 * value;
            Console.Write("\n Result of another expensive computation: " + nextValue);
            return nextValue;
        });

    var hotObservable = observable2.Publish().RefCount();

    hotObservable.Subscribe(_ => Console.Write("\n Data received on first subscription."));

    hotObservable.Subscribe(_ => Console.Write("\n Data received on second subscription."));

}

Cold Observables:

  • Multiple subscriptions trigger multiple code re-run of chained operations
  • Normally sourced from Observable.Create/Range/Generate/Interval/Timer
  • Like an enumerable of items with time gaps between items explicitly defined.

Hot Observables:

  • Multiple subscriptions generally share results of chained operations
  • Normally sourced from an event, published cold observable, or a Subject
  • Like a series of events that you can handle; they happen whether you like it or not.

The best differentiator, in my mind, is whether or not the source is 'running' without a subscription. A button click event, for example, still happens whether or not someone's subscribed. Contrarily, Observable.Timer does nothing without a subscription.

As far as why... I can't speak for Rx designers. Cold vs hot observables is a frequent source of confusion/bugs. I would think having a better explicitness around them could have improved the situation. Having said that, I'm not sure how do-able that was or is.

Upvotes: 3

Related Questions