Marty Neal
Marty Neal

Reputation: 9533

FirstOrLast IObservable Extension

I want to run through an IObservable<T> looking for an element that matches a predicate, and if not found, return the last element of the IObservable<T>. I don't want to have to store the entire contents of the IObservable<T>, and I don't want to loop through the IObservable twice, so I've set up an extension method

public static class ObservableExtensions
{
    public static IObservable<T> FirstOrLastAsync<T>(this IObservable<T> source, Func<T, bool> pred)
    {
        return Observable.Create<T>(o =>
        {
            var hot = source.Publish();
            var store = new AsyncSubject<T>();
            var d1 = hot.Subscribe(store);
            var d2 = hot.FirstAsync(x => pred(x)).Amb(store).Subscribe(o);
            var d3 = hot.Connect();
            return new CompositeDisposable(d1, d2, d3);
        });
    }

    public static T FirstOrLast<T>(this IObservable<T> source, Func<T, bool> pred)
    {
        return source.FirstOrLastAsync(pred).Wait();
    }
}

The Async method creates a hot observable from a potentially cold one passed in. It subscribes an AsyncSubject<T> to remember the last element, and an IObservable<T> that looks for the element. It then takes the first element from either of those IObservable<T>s, which ever returns a value first via .Amb (AsyncSubject<T> doesn't return a value until it gets an .OnCompleted message).

My questions are the following:

var d2 = hot.Where(x => pred(x)).Take(1).Amb(store).Subscribe(o);

I'm pretty new to RX, and this is my first extension on IObservable.

EDIT

I ended up going with

public static class ObservableExtensions
{
    public static IObservable<T> FirstOrLastAsync<T>(this IObservable<T> source, Func<T, bool> pred)
    {
        var hot = source.Publish().RefCount();
        return hot.TakeLast(1).Amb(hot.Where(pred).Take(1).Concat(Observable.Never<T>()));
    }

    public static T FirstOrLast<T>(this IObservable<T> source, Func<T, bool> pred)
    {
        return source.FirstOrLastAsync(pred).First();
    }
}

Upvotes: 1

Views: 362

Answers (2)

Asti
Asti

Reputation: 12667

You could Amb the two cases you want together. If your source observable is cold, you can do a Publish|Refcount.

    public static IObservable<T> FirstOrLast<T>(this IObservable<T> source, Func<T, bool> predicate)
    {
        return source.TakeLast(1).Amb(source.Where(predicate).Take(1));
    }

Test:

        var source = Observable.Interval(TimeSpan.FromSeconds(0.1))
                               .Take(10)
                               .Publish()
                               .RefCount();

        FirstOrLast(source, i => i == 5).Subscribe(Console.WriteLine); //5
        FirstOrLast(source, i => i == 11).Subscribe(Console.WriteLine); //9

Upvotes: 1

Enigmativity
Enigmativity

Reputation: 117027

I've tried to produce a "simpler" query that works and so far nothing.

If I stick with your basic structure I can offer a slight improvement. Try this:

public static IObservable<T> FirstOrLastAsync<T>(
    this IObservable<T> source, Func<T, bool> pred)
{
    return Observable.Create<T>(o =>
    {
        var hot = source.Publish();
        var store = new AsyncSubject<T>();
        var d1 = hot.Subscribe(store);
        var d2 =
            hot
                .Where(x => pred(x))
                .Concat(store)
                .Take(1)
                .Subscribe(o);
        var d3 = hot.Connect();
        return new CompositeDisposable(d1, d2, d3);
    });
}

It's not hugely better, but I like it better than using Amb. It's just a tad cleaner I think.

Upvotes: 0

Related Questions