Reputation: 1699
I'm learning reactive extensions (rx) in .NET and I'm struggling a little bit with what a "subscription" really is and when it is used.
Lets take some sample data, taken from this thread:
using System;
using System.Reactive.Linq;
using System.Threading;
namespace ConsoleApp1
{
class Program
{
class Result
{
public bool Flag { get; set; }
public string Text { get; set; }
}
static void Main(string[] args)
{
var source =
Observable.Create<Result>(f =>
{
Console.WriteLine("Start creating data!");
f.OnNext(new Result() { Text = "one", Flag = false });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "two", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "three", Flag = false });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "four", Flag = false });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "five", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "six", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "seven", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "eight", Flag = false });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "nine", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "ten", Flag = false });
return () => Console.WriteLine("Observer has unsubscribed");
});
}
}
}
Beware of the line:
Console.WriteLine("Start creating data!");
Now, first I thought a subscription simply is used by using the .Subscribe
operator. So an observer (e.g. the callback of the .Subscribe
function) subscribes to an observable (the last return value of a chain of operators) like this (just as an example, the query doesn't have a real use):
source.Zip(source, (s1, s0) =>
s0.Flag
? Observable.Return(s1)
: Observable.Empty<Result>()).Merge().Subscribe(f => { Console.WriteLine(f.Text); });
Now I was expecting to get the "Start creating data!" output only once, since I was only using one subscription. But in fact, I got it twice:
Start creating data!
Start creating data!
two
five
six
seven
nine
I was told that everytime I use an operator on source.
, a subscription is made. But in this example I'm using source.
only once and then a second time just as a parameter for the .Zip
operator. Or is it because the source is passed to the .Zip
function by value subscribed again?
So my questions are:
Btw. I know I can prevent multiple subscriptions from happening by using the .Publish
operator, but that isn't the scope of my questions.
Upvotes: 1
Views: 857
Reputation: 18663
In simple terms a Subscription just represents an Observable
that has been subscribed to. This process can happen either explicitly by using .Subscribe
or implicitly by joining two or more Observables
and then subscribing to the resulting chain.
In your case you are seeing both happen, once explicitly when you call Subscribe
and one implicitly when you pass source
to Zip
, that is, there are two Subscriptions
to the source
Observable
.
Why is that important? Because by default Observables
are lazy, meaning that they will not begin processing until you subscribe to them (the product of that process being a Subscription
), by extension this means that any time you subscribe to the Observable
it will effectively begin a new stream. This behavior can be overridden like you alluded to with Publish
, but the default is for each Observable
to be cold.
In your specific case, since you are passing the same Observable
to Zip
it needs to subscribe to it twice, since it will be zipping together events from the two passed streams. The result is two subscriptions to the same Observable
which each run independently of each other.
Upvotes: 2