Bogey
Bogey

Reputation: 5734

Observable: Getting latest value in intervals until source finishes

I'm looking for an observable selector with a signature akin to this:

static IObservable<T> TakeLatest(this IObservable<T> input, TimeSpan interval)

Which should:

  1. Emit the first item as soon as input emits its first item
  2. From then on, in fixed time intervals afterwards, emit the most recent item produced by input
  3. Complete (or fail) whenever input completes (or fails)

In terms of marbles, something like the following - assuming interval = 2 time units:

Time 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
Input A B C D E F (complete)
Output A B D D E E complete (F not emitted anymore)

Is there any out-of-the-box way of doing so, or a reasonably easy selector to produce these results?

Upvotes: 3

Views: 760

Answers (2)

Theodor Zoulias
Theodor Zoulias

Reputation: 43525

This should probably do exactly what you want. I haven't tested it though.

/// <summary>Samples the source observable sequence at each interval,
/// allowing repeated emissions of the same element.</summary>
public static IObservable<T> SampleWithDuplicates<T>(this IObservable<T> source,
    TimeSpan interval, IScheduler scheduler = null)
{
    scheduler ??= DefaultScheduler.Instance;
    return source.Publish(published => Observable
        .Interval(interval, scheduler)
        .WithLatestFrom(published, (_, x) => x)
        .Merge(published.FirstAsync())
        .TakeUntil(published.LastOrDefaultAsync()));
}

Upvotes: 3

Bogey
Bogey

Reputation: 5734

I've done the following now - I think it works, but I'll heave this open in case anyone can think of a more elegant way (or can think of an issue with my current implementation)

static IObservable<T> TakeLatest(this IObservable<T> input, TimeSpan interval, IScheduler scheduler) => input
  .FirstAsync()
  .Select(_ => Observable.Interval(interval, scheduler).StartWith(0))
  .Switch()
  .CombineLatest(input, (a,b) => (a,b))
  .DistinctUntilChanged(x => x.a)
  .Select(x => x.b)
  .TakeUntil(input.LastAsync());

Upvotes: 0

Related Questions