Reputation: 4272
I'm attempting to create an IObservable<T>
from two arrays (IEnumerable
s). I'm trying to avoid explicitly iterating over the arrays and calling observer.OnNext
. I came across the Observable.Subscribe extension method, which at first glance would appear to be what I need. However, it does not work as I expected to and I'm at a loss as to why.
The following code is an example:
class Program
{
static void Main(string[] args)
{
var observable = Observable.Create<char>(observer =>
{
var firstBytes = new[] {'A'};
var secondBytes = new[] {'Z', 'Y'};
firstBytes.Subscribe(observer);
secondBytes.Subscribe(observer);
return Disposable.Empty;
}
);
observable.Subscribe(b => Console.Write(b));
}
}
The output of this is "AZ", not "AZY" as I expected. Now, if I subscribe to secondBytes
before firstBytes
, the output is "ZAY"! This seems to suggest it is enumerating the two arrays in-step - which kind of explains the "AZ" output.
Anyhow, I'm at a complete loss as to why it behaves like this and would appreciate any insight people may be able to provide.
Upvotes: 2
Views: 1101
Reputation: 320
The reason for the lock-step iteration behaviour can be explained by the implementation of Observable.Subscribe(IEnumerable source) which uses a "recursive" algorithm which works by calling e.MoveNext
in a scheduler action. If it is successful then the value is emitted and and the a new scheduler action is then queued to read the next value from the enumerable.
As you are subscribing to two enumerables and are not specifying any particular scheduler for the subscription, the default iteration scheduler will be used for these operations (defined by SchedulerDefaults.Iteration
) which defaults to running on the current thread. This means that the enumeration actions will be queued up to run after your current subscription action completes. This causes the enumeration actions to be interleaved - something like this
The observer receives the OnCompleted() notification at step 5 so the remaining secondBytes enumeration steps are ignored. If you had returned your subscription disposables then the second enumeration would have been cancelled at that point instead.
Upvotes: 2
Reputation:
Because you are subscribing to two observables, as opposed to a single observable that is the concatenation of two observables, there are two possible sources that can invoke the observer's OnComplete
method. Since the first array is shorter, it completes after the first item is emitted, and the observer unsubscribes since it has received a completion notification.
The correct way to do this is to combine the two sequences into a single sequence, then subscribe to that:
var observable = Observable.Create<char>(observer =>
{
var firstBytes = new[] { 'A' };
var secondBytes = new[] { 'Z', 'Y' };
return firstBytes.Concat(secondBytes).Subscribe(observer);
});
observable.Subscribe(Console.Write);
Upvotes: 2