Reputation: 2789
For an Rx based change tracking solution I am in need of an operator which can get me the first and most recent item in an observable sequence.
How would I write an Rx operator that produces the following marble diagram (Note: the brackets are used just to lineup the items...I'm not sure how best to represent this in text):
xs:---[a ]---[b ]-----[c ]-----[d ]---------|
desired:---[a,a]---[a,b]-----[a,c]-----[a,d]---------|
Upvotes: 3
Views: 4309
Reputation: 29573
I suspect there's a much better way of doing this (and I dislike using Do), but you could create an operator like this
public static IObservable<Tuple<T, T>> FirstAndLatest2<T>(this IObservable<T> source)
{
return Observable.Defer(() => {
bool hasFirst = false;
T first = default(T);
return source
.Do(item =>
{
if (!hasFirst)
{
hasFirst = true;
first = item;
}
})
.Select(current => Tuple.Create(first, current));
});
}
Then you would use it like this:
Observable.Interval(TimeSpan.FromSeconds(0.1))
.FirstAndLatest()
.Subscribe(Console.WriteLine);
Upvotes: 1
Reputation: 15618
Using the same naming as @Wilka you can use the below extension which is somewhat self-explanatory:
public static IObservable<TResult> FirstAndLatest<T, TResult>(this IObservable<T> source, Func<T,T,TResult> func)
{
var published = source.Publish().RefCount();
var first = published.Take(1);
return first.CombineLatest(published, func);
}
Note that it doesn't necessarily return a Tuple
, but rather gives you the option of passing a selector function on the result. This keeps it in line with the underlying primary operation (CombineLatest
). This is obviously easily changed.
Usage (if you want Tuples in the resulting stream):
Observable.Interval(TimeSpan.FromSeconds(0.1))
.FirstAndLatest((a,b) => Tuple.Create(a,b))
.Subscribe(Console.WriteLine);
Upvotes: 5
Reputation: 117029
Try this:
public static IObservable<Tuple<T, T>> FirstAndLatest<T>(
this IObservable<T> source)
{
return
source
.Take(1)
.Repeat()
.Zip(source, (x0, xn) => Tuple.Create(x0, xn));
}
Simple, huh?
Or, as an alternative to share the underlying source, try this:
public static IObservable<Tuple<T, T>> FirstAndLatest<T>(
this IObservable<T> source)
{
return
source.Publish(
s =>
s.Take(1)
.Repeat()
.Zip(s, (x0, xn) => Tuple.Create(x0, xn)));
}
WHOOPS! Scratch this. It doesn't work. It essentially keeps producing a pair of the latest values. Publishing like this isn't working. The original implementation is the best.
Upvotes: 2