Reputation: 21756
Running the following code
foreach(var i in
Observable
.Range(1, 3)
.Do(Console.WriteLine)
.ToEnumerable())
Console.WriteLine("Fin:" + i);
I'm getting this output:
1
2
3
Fin:1
Fin:2
Fin:3
The question is - why does ToEnumerable caches all the values and provides them just after the source sequence completes? Does it related somehow to "leaving the monad"?
Upvotes: 2
Views: 213
Reputation: 43464
If you dig into the source code of the Rx library you'll see that the ToEnumerable
operator is implemented basically like this:
public static IEnumerable<T> ToEnumerable<T>(this IObservable<T> source)
{
using var enumerator = new GetEnumerator<T>();
enumerator.Run(source);
while (enumerator.MoveNext()) yield return enumerator.Current;
}
...where the GetEnumerator<T>
is a class defined in this file. This class is an IEnumerator<T>
and an IObserver<T>
. It has an internal _queue
(ConcurrentQueue<T>
) where it stores the received items. The most interesting methods are the Run
, OnNext
and MoveNext
:
public IEnumerator<T> Run(IObservable<T> source)
{
_subscription.Disposable = source.Subscribe(this);
return this;
}
public void OnNext(T value)
{
_queue.Enqueue(value);
_gate.Release();
}
public bool MoveNext()
{
_gate.Wait();
if (_queue.TryDequeue(out _current)) return true;
_error?.Throw();
return false;
}
In your code, when you start the foreach
loop, the Run
method runs, and the Range+Do
sequence is subscribed. This sequence emits all its elements during the subscription. The OnNext
method is invoked for each emited element, so all elements are enqueued inside the _queue
. After the completion of the Run
method, follows the while
loop that dequeues and yields the queued elements. That's why you see all the sideffects of the Do
operator happening before any iteration of your foreach
loop.
The Rx library includes another operator similar to the ToEnumerable
, the Next
operator, with this signature:
// Returns an enumerable sequence whose enumeration blocks until the next element
// in the source observable sequence becomes available. Enumerators on the resulting
// sequence will block until the next element becomes available.
public static IEnumerable<T> Next<T>(this IObservable<T> source);
According to my expirements this operator doesn't do what you want either.
Upvotes: 2
Reputation: 4488
ToEnumerable
doesn't wait for observable to complete, it just happens that observable completes synchronously in this case. Observable.Interval
shows this:
var enumerable =
Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Take(3)
.Do(i => Console.WriteLine("obs: {0}", i))
.ToEnumerable();
foreach (int value in enumerable)
{
Console.WriteLine(value);
}
Upvotes: 2