Lemon Ade
Lemon Ade

Reputation: 13

Using rx to subscribe to event and perform logging after time interval

I have a simple use case where:

  1. Receive a notification of events
  2. Perform some action on the event
  3. Print the content after x interval

How can I do the above step in a single Rx pipeline?

Something like below:

void Main()
{
    var observable = Observable.Interval(TimeSpan.FromSeconds(1));

    // Receive event and call Foo()
    observable.Subscribe(x=>Foo());
    // After 1 minute, I want to print the result of count
    // How do I do this using above observable?
}

int count = 0;
void Foo()
{
    Console.Write(".");
    count ++;
}

Upvotes: 1

Views: 1285

Answers (2)

Enigmativity
Enigmativity

Reputation: 117064

I think this does what you want:

var observable =
    Observable
        .Interval(TimeSpan.FromSeconds(1))
        .Do(x => Foo())
        .Window(() => Observable.Timer(TimeSpan.FromMinutes(1.0)));

var subscription =
    observable
        .Subscribe(xs => Console.WriteLine(count));

However, it's a bad idea to mix state with observables. If you had two subscriptions you'd increment count twice as fast. It's better to encapsulate your state within the observable so that each subscription would get a new instance of count.

Try this instead:

var observable =
    Observable
        .Defer(() =>
        {
            var count = 0;
            return
                Observable
                    .Interval(TimeSpan.FromSeconds(1))
                    .Select(x =>
                    {
                        Console.Write(".");
                        return ++count;
                    });
        })
        .Window(() => Observable.Timer(TimeSpan.FromMinutes(0.1)))
        .SelectMany(xs => xs.LastAsync());

var subscription =
    observable
        .Subscribe(x => Console.WriteLine(x));

I get this kind of output:

...........................................................59
............................................................119
............................................................179
............................................................239

Remembering that it starts with 0 then this is timing pretty well.


After seeing paulpdaniels answer I realized that I could replace my Window/SelectMany/LastAsync with the simpler Sample operator.

Also, if we don't really need the side-effect of incrementing a counter then this whole observable shrinks down to this:

var observable =
    Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Do(x => Console.Write("."))
        .Sample(TimeSpan.FromMinutes(1.0));

observable.Subscribe(x => Console.WriteLine(x));

Much simpler!

Upvotes: 2

paulpdaniels
paulpdaniels

Reputation: 18663

I would use Select + Sample:

var observable = Observable.Interval(TimeSpan.FromSeconds(1))
          .Select((x, i) => {
            Foo(x);
            return i;
          })
          .Do(_ => Console.Write("."))
          .Sample(TimeSpan.FromMinutes(1));

observable.Subscribe(x => Console.WriteLine(x));

Select has an overload that returns the index of the current value, by returning that and then sampling at 1 minute intervals, you can get the last value emitted during that interval.

Upvotes: 1

Related Questions