Reputation: 3569
I have encountered something strange when using System.Reactive
. Maybe this is the regular behavior but makes little sense to me.
Let's take following code:
Subject<IObservable<long>> X = new Subject<IObservable<long>>();
IObservable<long> I = Observable.Interval(TimeSpan.FromSeconds(1));
async Task Main()
{
X.Switch().Subscribe(x => Console.WriteLine($"switched_1: {x}"));
I.Subscribe(x => Console.WriteLine($"direct_1: {x}"));
X.Switch().Subscribe(x => Console.WriteLine($"switched_2: {x}"));
I.Subscribe(x => Console.WriteLine($"direct_2: {x}"));
await Task.Factory.StartNew(async () =>
{
await Task.Delay(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
X.Switch().Subscribe(x => Console.WriteLine($"switched_3 !!!: {x}"));
I.Subscribe(x => Console.WriteLine($"direct_3: {x}"));
});
X.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));
Console.ReadLine();
X.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));
Console.WriteLine("New observable emited");
Console.ReadLine();
}
The observable marked with !!! is never hit until the second Interval is emitted.
[Update]
I think I know what's happening: I am subscribing to the upstream observable each time with a new switch. And while I am doing this, I will get notified only about the observable emitted after the subscription, and I can not "connect" to the current observable. I thought that by using switch only once, and subscribe to the resulting observable later will help:
Subject<IObservable<long>> X = new Subject<IObservable<long>>();
IObservable<long> XI;
void Main()
{
XI = X.Switch().AsObservable();
XI.Subscribe(x => Console.WriteLine($"switched_1: {x}"));
XI.Subscribe(x => Console.WriteLine($"switched_2: {x}"));
X.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));
XI.Subscribe(x => Console.WriteLine($"switched_3 !!!: {x}"));
Console.ReadLine();
X.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));
Console.WriteLine("New observable emited");
Console.ReadLine();
}
But did not :(
[Update 2]
It seems that I have found a solution that actually works, but I am not sure if it is correctly used or not.
Subject<IObservable<long>> X = new Subject<IObservable<long>>();
IObservable<long> XI;
async Task Main()
{
XI = X.Switch().Publish().AutoConnect();
...
How can I make it work form the very beginning?
Upvotes: 0
Views: 181
Reputation: 2430
Your explanation under [Update] is correct because Task.Factory.StartNew is returning a Task<Task> instead of a Task. You have to use a double await or Task.Run if you want that the subscription happens before you call OnNext().
However only exposing a XI-Observable as in [Update2] and hide the fact that there is something switched underneath is a viable option.
With Publish().AutoConnect() you transform the observable from cold to hot. You can also try out a BehaviorSubject or a ReplaySubject on your snippet to understand the differences between hot & cold. If you understand these differences it should be much clearer for you, how the actual solution you have in mind has to look like.
Upvotes: 2