Reputation: 787
I have been trying to understand Rx more deeply, by following Bart De Smetts MinLinq, and Jon Skeets 'Reimplementing' series I have built up a good understanding but...
Taking the following code as an example
var onePerSecond = Observable.Interval(TimeSpan.FromSeconds(1));
var evenNums = onePerSecond.Where(x => x % 2 == 0);
evenNums.Subscribe(Console.WriteLine);
From an equivelant IEnumerable point of view I understand the data flow of MoveNext/Current, and from Mr Skeets' blog how a Where method could be implemented by using foreach over the IEnumerable 'this' parameter of the extension method.
But in the case of IObservable's Where method, would it contain code to implement the IObserver interface (or lambda equivelant), and therefore effectively be observing all notifications from the onePerSecond object, and in turn returning an IObservable that only contains values that the predicate found to be true?
Any help and thoughts very welcome, many thanks
James
Upvotes: 3
Views: 592
Reputation: 19117
Here are some toy examples to get a feel for how it works:
https://github.com/ScottWeinstein/Rx-Demo/tree/master/ImplementWhereDemo
public class WhereObservableLessPedantic<T> : IObservable<T>
{
private Func<T, bool> _pred;
private IObservable<T> _stream;
public WhereObservableLessPedantic(IObservable<T> stream, Func<T, bool> pred)
{
_pred = pred;
_stream = stream;
}
public IDisposable Subscribe(IObserver<T> downStreamObserver)
{
Action<T> onNext = nextVal =>
{
if (_pred(nextVal))
downStreamObserver.OnNext(nextVal);
};
return _stream.Subscribe(onNext);
}
}
public class WhereObserverPedantic<T> : IObserver<T>
{
private IObserver<T> _downStreamObserver;
private Func<T, bool> _pred;
public WhereObserverPedantic(IObserver<T> downStreamObserver, Func<T, bool> pred)
{
_pred = pred;
_downStreamObserver = downStreamObserver;
}
public void OnNext(T value)
{
if (_pred(value))
{
_downStreamObserver.OnNext(value);
}
}
public void OnCompleted() { }
public void OnError(Exception error) { }
}
Upvotes: 1
Reputation: 30883
By looking at the source code with ILSpy it's easy to see that's exactly how Where is implemented. It return a new observable which filters items based on the predicate you pass:
public static IObservable<TSource> Where<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (predicate == null)
{
throw new ArgumentNullException("predicate");
}
return new AnonymousObservable<TSource>((IObserver<TSource> observer) => source.Subscribe(delegate(TSource x)
{
bool flag;
try
{
flag = predicate(x);
}
catch (Exception error)
{
observer.OnError(error);
return;
}
if (flag)
{
observer.OnNext(x);
}
}
, new Action<Exception>(observer.OnError), new Action(observer.OnCompleted)));
}
Upvotes: 4