Reputation: 9533
I want to run through an IObservable<T>
looking for an element that matches a predicate, and if not found, return the last element of the IObservable<T>
. I don't want to have to store the entire contents of the IObservable<T>
, and I don't want to loop through the IObservable
twice, so I've set up an extension method
public static class ObservableExtensions
{
public static IObservable<T> FirstOrLastAsync<T>(this IObservable<T> source, Func<T, bool> pred)
{
return Observable.Create<T>(o =>
{
var hot = source.Publish();
var store = new AsyncSubject<T>();
var d1 = hot.Subscribe(store);
var d2 = hot.FirstAsync(x => pred(x)).Amb(store).Subscribe(o);
var d3 = hot.Connect();
return new CompositeDisposable(d1, d2, d3);
});
}
public static T FirstOrLast<T>(this IObservable<T> source, Func<T, bool> pred)
{
return source.FirstOrLastAsync(pred).Wait();
}
}
The Async method creates a hot observable from a potentially cold one passed in. It subscribes an AsyncSubject<T>
to remember the last element, and an IObservable<T>
that looks for the element. It then takes the first element from either of those IObservable<T>
s, which ever returns a value first via .Amb
(AsyncSubject<T>
doesn't return a value until it gets an .OnCompleted
message).
My questions are the following:
var d2 = hot.Where(x => pred(x)).Take(1).Amb(store).Subscribe(o);
I'm pretty new to RX, and this is my first extension on IObservable.
EDIT
I ended up going with
public static class ObservableExtensions
{
public static IObservable<T> FirstOrLastAsync<T>(this IObservable<T> source, Func<T, bool> pred)
{
var hot = source.Publish().RefCount();
return hot.TakeLast(1).Amb(hot.Where(pred).Take(1).Concat(Observable.Never<T>()));
}
public static T FirstOrLast<T>(this IObservable<T> source, Func<T, bool> pred)
{
return source.FirstOrLastAsync(pred).First();
}
}
Upvotes: 1
Views: 362
Reputation: 12667
You could Amb the two cases you want together.
If your source observable is cold, you can do a Publish|Refcount
.
public static IObservable<T> FirstOrLast<T>(this IObservable<T> source, Func<T, bool> predicate)
{
return source.TakeLast(1).Amb(source.Where(predicate).Take(1));
}
Test:
var source = Observable.Interval(TimeSpan.FromSeconds(0.1))
.Take(10)
.Publish()
.RefCount();
FirstOrLast(source, i => i == 5).Subscribe(Console.WriteLine); //5
FirstOrLast(source, i => i == 11).Subscribe(Console.WriteLine); //9
Upvotes: 1
Reputation: 117027
I've tried to produce a "simpler" query that works and so far nothing.
If I stick with your basic structure I can offer a slight improvement. Try this:
public static IObservable<T> FirstOrLastAsync<T>(
this IObservable<T> source, Func<T, bool> pred)
{
return Observable.Create<T>(o =>
{
var hot = source.Publish();
var store = new AsyncSubject<T>();
var d1 = hot.Subscribe(store);
var d2 =
hot
.Where(x => pred(x))
.Concat(store)
.Take(1)
.Subscribe(o);
var d3 = hot.Connect();
return new CompositeDisposable(d1, d2, d3);
});
}
It's not hugely better, but I like it better than using Amb
. It's just a tad cleaner I think.
Upvotes: 0