ZorgoZ
ZorgoZ

Reputation: 3569

Correctly subscribing to the current observable when using switch on observable stream

I have encountered something strange when using System.Reactive. Maybe this is the regular behavior but makes little sense to me.

Let's take following code:

Subject<IObservable<long>> X = new Subject<IObservable<long>>();

IObservable<long> I = Observable.Interval(TimeSpan.FromSeconds(1));

async Task Main()
{

    X.Switch().Subscribe(x => Console.WriteLine($"switched_1: {x}"));
    I.Subscribe(x => Console.WriteLine($"direct_1: {x}"));
    X.Switch().Subscribe(x => Console.WriteLine($"switched_2: {x}"));
    I.Subscribe(x => Console.WriteLine($"direct_2: {x}"));

    await Task.Factory.StartNew(async () =>
    {
        await Task.Delay(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
        X.Switch().Subscribe(x => Console.WriteLine($"switched_3 !!!: {x}"));
        I.Subscribe(x => Console.WriteLine($"direct_3: {x}"));
    });

    X.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));
    Console.ReadLine();
    X.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));
    Console.WriteLine("New observable emited");
    Console.ReadLine();
}

The observable marked with !!! is never hit until the second Interval is emitted.

enter image description here

[Update]

I think I know what's happening: I am subscribing to the upstream observable each time with a new switch. And while I am doing this, I will get notified only about the observable emitted after the subscription, and I can not "connect" to the current observable. I thought that by using switch only once, and subscribe to the resulting observable later will help:

Subject<IObservable<long>> X = new Subject<IObservable<long>>();

IObservable<long> XI;

void Main()
{
    XI = X.Switch().AsObservable();

    XI.Subscribe(x => Console.WriteLine($"switched_1: {x}"));
    XI.Subscribe(x => Console.WriteLine($"switched_2: {x}"));

    X.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));
    XI.Subscribe(x => Console.WriteLine($"switched_3 !!!: {x}"));
    Console.ReadLine();
    X.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));
    Console.WriteLine("New observable emited");
    Console.ReadLine();
}

But did not :(

[Update 2]

It seems that I have found a solution that actually works, but I am not sure if it is correctly used or not.

Subject<IObservable<long>> X = new Subject<IObservable<long>>();

IObservable<long> XI;

async Task Main()
{
    XI = X.Switch().Publish().AutoConnect();
...

How can I make it work form the very beginning?

Upvotes: 0

Views: 181

Answers (1)

Awesomni.Codes
Awesomni.Codes

Reputation: 2430

Your explanation under [Update] is correct because Task.Factory.StartNew is returning a Task<Task> instead of a Task. You have to use a double await or Task.Run if you want that the subscription happens before you call OnNext().

However only exposing a XI-Observable as in [Update2] and hide the fact that there is something switched underneath is a viable option.

With Publish().AutoConnect() you transform the observable from cold to hot. You can also try out a BehaviorSubject or a ReplaySubject on your snippet to understand the differences between hot & cold. If you understand these differences it should be much clearer for you, how the actual solution you have in mind has to look like.

Upvotes: 2

Related Questions