jameschinnock
jameschinnock

Reputation: 787

How is IObservable's Where() extension method implemented?

I have been trying to understand Rx more deeply, by following Bart De Smetts MinLinq, and Jon Skeets 'Reimplementing' series I have built up a good understanding but...

Taking the following code as an example

var onePerSecond = Observable.Interval(TimeSpan.FromSeconds(1));
var evenNums = onePerSecond.Where(x => x % 2 == 0);
evenNums.Subscribe(Console.WriteLine);

From an equivelant IEnumerable point of view I understand the data flow of MoveNext/Current, and from Mr Skeets' blog how a Where method could be implemented by using foreach over the IEnumerable 'this' parameter of the extension method.

But in the case of IObservable's Where method, would it contain code to implement the IObserver interface (or lambda equivelant), and therefore effectively be observing all notifications from the onePerSecond object, and in turn returning an IObservable that only contains values that the predicate found to be true?

Any help and thoughts very welcome, many thanks

James

Upvotes: 3

Views: 592

Answers (2)

Scott Weinstein
Scott Weinstein

Reputation: 19117

Here are some toy examples to get a feel for how it works:

https://github.com/ScottWeinstein/Rx-Demo/tree/master/ImplementWhereDemo

public class WhereObservableLessPedantic<T> : IObservable<T>
{
    private Func<T, bool> _pred;
    private IObservable<T> _stream;

    public WhereObservableLessPedantic(IObservable<T> stream, Func<T, bool> pred)
    {
        _pred = pred;
        _stream = stream;
    }

    public IDisposable Subscribe(IObserver<T> downStreamObserver)
    {
        Action<T> onNext = nextVal =>
        {
            if (_pred(nextVal))
                downStreamObserver.OnNext(nextVal);
        };
        return _stream.Subscribe(onNext);
    }
}


public class WhereObserverPedantic<T> : IObserver<T>
{
    private IObserver<T> _downStreamObserver;
    private Func<T, bool> _pred;

    public WhereObserverPedantic(IObserver<T> downStreamObserver, Func<T, bool> pred)
    {
        _pred = pred;
        _downStreamObserver = downStreamObserver;
    }

    public void OnNext(T value)
    {
        if (_pred(value))
        {
            _downStreamObserver.OnNext(value);
        }
    }

    public void OnCompleted() { }
    public void OnError(Exception error) { }
}

Upvotes: 1

Giorgi
Giorgi

Reputation: 30883

By looking at the source code with ILSpy it's easy to see that's exactly how Where is implemented. It return a new observable which filters items based on the predicate you pass:

public static IObservable<TSource> Where<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
{
    if (source == null)
    {
        throw new ArgumentNullException("source");
    }
    if (predicate == null)
    {
        throw new ArgumentNullException("predicate");
    }
    return new AnonymousObservable<TSource>((IObserver<TSource> observer) => source.Subscribe(delegate(TSource x)
    {
        bool flag;
        try
        {
            flag = predicate(x);
        }
        catch (Exception error)
        {
            observer.OnError(error);
            return;
        }
        if (flag)
        {
            observer.OnNext(x);
        }
    }
    , new Action<Exception>(observer.OnError), new Action(observer.OnCompleted)));
}

Upvotes: 4

Related Questions