ZeroBugBounce
ZeroBugBounce

Reputation: 3670

How do I create an IObservable<T> that returns a value every -n- seconds without skipping any

This below example was my attempt at doing this:

var source
    = Observable.Sample(
          Observable.Range(1, int.MaxValue), TimeSpan.FromSeconds(2));

But when I .Subscribe() to that Observable and output it to the console, it shows a sequence like this, one line output every 2 seconds:

OnNext: 312969
OnNext: 584486
OnNext: 862009

Obviously the .Range() observable is running while the .Sample() observable is waiting 2 seconds between each output. I would like to know how to create an observable but that does not allow values to be skipped, so obviously that would look like this:

OnNext: 1
OnNext: 2
OnNext: 3

With one value from .Range() output every 2 seconds. How can I accomplish this in the Reactive Extensions for .NET?

Upvotes: 2

Views: 2916

Answers (3)

Oleg Dok
Oleg Dok

Reputation: 21756

A well known example of a pacer:

public static IObservable<T> Pace<T>(this IObservable<T> source, Timespan interval) =>
source
  .Select(p =>
    Observable
     .Empty<T>()
     .Delay(interval)
     .StartWith(p)
  )
  .Concat();

Upvotes: 0

Richard Anthony Hein
Richard Anthony Hein

Reputation: 10650

Using Observable.GenerateWithTime:

var source = Observable.GenerateWithTime(1, _ => true, x => ++x, x => x, x => TimeSpan.FromSeconds(2));

Observable.Range uses Observable.Generate, so this is one approach. There could be many other ways.

For something more advanced, like dealing with events in the same manner (because this will obviously only help if you are generating the data yourself), see How to throttle event stream using RX? which deals with this problem and has been solved.

Upvotes: 5

spender
spender

Reputation: 120400

I approached this recently by creating an Observable that emits timed events every timeInterval. You can then use the Zip method to sychronize the events from your Observable with those of the timer Observable.

For instance:

    var timer = 
        Observable
            .Timer(
                TimeSpan.FromSeconds(0), 
                TimeSpan.FromSeconds(2)
            );
    var source = Observable.Range(1, int.MaxValue);
    var timedSource = source.Zip(timer,(s,t)=>s);
    timedSource.Subscribe(Console.WriteLine);

Upvotes: 4

Related Questions