cristobalito
cristobalito

Reputation: 4272

Behavior of Observable.Subscribe when used with multiple IEnumerables

I'm attempting to create an IObservable<T> from two arrays (IEnumerables). I'm trying to avoid explicitly iterating over the arrays and calling observer.OnNext. I came across the Observable.Subscribe extension method, which at first glance would appear to be what I need. However, it does not work as I expected to and I'm at a loss as to why.

The following code is an example:

  class Program
  {
    static void Main(string[] args)
    {
      var observable = Observable.Create<char>(observer =>
        {
          var firstBytes = new[] {'A'};
          var secondBytes = new[] {'Z', 'Y'};
          firstBytes.Subscribe(observer);
          secondBytes.Subscribe(observer);

          return Disposable.Empty;
        }
      );

      observable.Subscribe(b => Console.Write(b));
    }
  }

The output of this is "AZ", not "AZY" as I expected. Now, if I subscribe to secondBytes before firstBytes, the output is "ZAY"! This seems to suggest it is enumerating the two arrays in-step - which kind of explains the "AZ" output.

Anyhow, I'm at a complete loss as to why it behaves like this and would appreciate any insight people may be able to provide.

Upvotes: 2

Views: 1101

Answers (2)

Malcolm
Malcolm

Reputation: 320

The reason for the lock-step iteration behaviour can be explained by the implementation of Observable.Subscribe(IEnumerable source) which uses a "recursive" algorithm which works by calling e.MoveNext in a scheduler action. If it is successful then the value is emitted and and the a new scheduler action is then queued to read the next value from the enumerable.

As you are subscribing to two enumerables and are not specifying any particular scheduler for the subscription, the default iteration scheduler will be used for these operations (defined by SchedulerDefaults.Iteration) which defaults to running on the current thread. This means that the enumeration actions will be queued up to run after your current subscription action completes. This causes the enumeration actions to be interleaved - something like this

  1. firstBytes.Subscribe() -> queue enumerate action
  2. secondBytes.Subscribe() -> queue enumerate action
  3. call firstBytes.MoveNext() -> OnNext("A") -> queue next enumeration action
  4. call secondBytes.MoveNext() -> OnNext("Z") -> queue next enumeration action
  5. call firstBytes.MoveNext() -> OnCompleted()
  6. call secondBytes.MoveNext() -> OnNext(Y) -> queue next enumeration action
  7. call secondBytes.MoveNext() -> OnCompleted()

The observer receives the OnCompleted() notification at step 5 so the remaining secondBytes enumeration steps are ignored. If you had returned your subscription disposables then the second enumeration would have been cancelled at that point instead.

Upvotes: 2

user1726343
user1726343

Reputation:

Because you are subscribing to two observables, as opposed to a single observable that is the concatenation of two observables, there are two possible sources that can invoke the observer's OnComplete method. Since the first array is shorter, it completes after the first item is emitted, and the observer unsubscribes since it has received a completion notification.

The correct way to do this is to combine the two sequences into a single sequence, then subscribe to that:

var observable = Observable.Create<char>(observer =>
{
    var firstBytes = new[] { 'A' };
    var secondBytes = new[] { 'Z', 'Y' };

    return firstBytes.Concat(secondBytes).Subscribe(observer);
});

observable.Subscribe(Console.Write);

Upvotes: 2

Related Questions