Reputation: 5734
I'm looking for an observable selector with a signature akin to this:
static IObservable<T> TakeLatest(this IObservable<T> input, TimeSpan interval)
Which should:
In terms of marbles, something like the following - assuming interval = 2 time units:
Time | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Input | A | B | C | D | E | F (complete) | |||||||||
Output | A | B | D | D | E | E | complete (F not emitted anymore) |
Is there any out-of-the-box way of doing so, or a reasonably easy selector to produce these results?
Upvotes: 3
Views: 760
Reputation: 43525
This should probably do exactly what you want. I haven't tested it though.
/// <summary>Samples the source observable sequence at each interval,
/// allowing repeated emissions of the same element.</summary>
public static IObservable<T> SampleWithDuplicates<T>(this IObservable<T> source,
TimeSpan interval, IScheduler scheduler = null)
{
scheduler ??= DefaultScheduler.Instance;
return source.Publish(published => Observable
.Interval(interval, scheduler)
.WithLatestFrom(published, (_, x) => x)
.Merge(published.FirstAsync())
.TakeUntil(published.LastOrDefaultAsync()));
}
Upvotes: 3
Reputation: 5734
I've done the following now - I think it works, but I'll heave this open in case anyone can think of a more elegant way (or can think of an issue with my current implementation)
static IObservable<T> TakeLatest(this IObservable<T> input, TimeSpan interval, IScheduler scheduler) => input
.FirstAsync()
.Select(_ => Observable.Interval(interval, scheduler).StartWith(0))
.Switch()
.CombineLatest(input, (a,b) => (a,b))
.DistinctUntilChanged(x => x.a)
.Select(x => x.b)
.TakeUntil(input.LastAsync());
Upvotes: 0