Mike-E
Mike-E

Reputation: 2475

Correct Way of Subscribing to Multiple Intervals of Differing Times

I am building my first Reactive.NET application and am struggling with how to manage two sets of observables.

To start with, I am polling a server in both sets. I have two "rings" of time (faster and slower): one that fires every second and one that fires every 15 seconds. I noticed that if the server takes a while to process one of the calls in one of the rings, it will immediately "catch up" and continuously fire the events once the call has completed (via timeout or otherwise). So, if I am firing every second and there is a 30 second delay, the subscription will resume by firing 30 times once the delay has completed (assuming there are no other further delays). This is not preferred as I want to keep calls to the server to a minimum, so sticking strictly per frame of time per ring. That is, if there is a 30 second delay, I do not want to bombarded the server with calls once that delay has elapsed.

I have gotten around this with using Sample. And, actually, everything works well, except now when the application starts, there is a delay on the outer ring and I do not see any output until 15 seconds. With the 1-second ring I got around this by using the StartsWith(-1) but I cannot seem to get around this with the outer (slower) ring.

Here is my code:

var fast = TimeSpan.FromSeconds(1);
var slow = TimeSpan.FromSeconds(15);
var application = Observable.Interval(fast)
                            .Sample(fast)
                            .StartWith(-1)
                            .Timestamp()
                            .Window(slow)
                            .Sample(slow)
                            .Subscribe(window =>
                                        {
                                            // Outer (slower) ring: ...

                                            window.Subscribe(x =>
                                                            {
                                                                // Inner (faster) ring: ...
                                                            });
                                        });

So, really, the questions are:

  1. Is this the correct/preferred way of creating two rings of subscribed time of differing intervals in Rx? (This is important going forward as I can see having additional rings of time such as 30 seconds, 1 minute, 5 minutes, etc. and I want to be sure I get this right.)
  2. If not, what is the best/preferred way doing this?
  3. How do I ensure the outer ring fires immediately and does not take 15 seconds to fire on the first attempt?

EDIT:

Based on @yohanmishkin's answer, here is the code that I am using:

var poll = Observable.Interval(TimeSpan.FromSeconds(1));
var lessFrequentPoll = Observable.Interval(TimeSpan.FromSeconds(15));

poll.Subscribe(o => application.UpdateFrequent(o));
lessFrequentPoll.Subscribe(o => application.UpdateLessFrequent(o));

using (new CompositeDisposable(poll, lessFrequentPoll))
{
    // ...
}

As mentioned in the answer there, I actually did use this when I first started Rx, but I was originally thinking (due to my primitive understanding of Rx) that for each subscription I would have to nest the using contexts and wanted to avoid that. So, if I ended up with 5 "rings" of time, I would have 5 nested using's and that is not pretty to me. Using the CompositeDisposable alleviates this.

Upvotes: 1

Views: 108

Answers (2)

Olly
Olly

Reputation: 6026

I find this kind of thing is a bit tricky with Rx. It seems as though the subscribed work shouldn't really have the side-effect of modifying the sequence itself. However, that's what you need, because if the work runs slowly (the service is slow) you need the polling frequency to be reduced.

I think that the various recursive overloads of IScheduler.Schedule() are designed to support this kind of thing. See http://www.introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html.

The idea is something like this:

private static IObservable<int> PollServer(IScheduler scheduler)
{
    return Observable.Create<int>(
        observer =>
        {
            var cts = new CancellationTokenSource();
            var token = cts.Token;

            // Make initial call immediately
            var scheduled = scheduler.Schedule(
                TimeSpan.Zero,
                async action =>
                {
                    try
                    {
                        var res = await CallServer(token);
                        observer.OnNext(res);

                        // Trigger another iteration after poll delay
                        action(TimeSpan.FromSeconds(1));
                    }
                    catch (Exception ex)
                    {
                        observer.OnError(ex);
                    }
                });

            // Discontinue polling when observable is disposed
            return () =>
            {
                cts.Cancel();
                scheduled.Dispose();
            };
        });
}

private static async Task<int> CallServer(CancellationToken token)
{
    // Remote service call
    await Task.Delay(100);
    return 42;
}

This obviously presents only the code for your "slow ring", but hopefully it gives an idea of what you would need.

Upvotes: 0

yohanmishkin
yohanmishkin

Reputation: 168

In this case you may want to create two separate observables and subscribe your application to them separately.

Here's an example:

var poll = Observable.Interval(TimeSpan.FromSeconds(1));
var lessFrequentPoll = Observable.Interval(TimeSpan.FromSeconds(15));

poll.Subscribe(o => application.UpdateFrequent(o));
lessFrequentPoll.Subscribe(o => application.UpdateLessFrequent(o));

If you still want to combine the two intervals into a single stream Rx provides plenty of operators for doing so (Merge, Zip, CombineLatest...).

In your case you may want to check out CombineLatest or WithLatestFrom. You can see a visualization of how they work here (CombinedLatest) or here (WithLatestFrom).

An example might be using to WithLatestFrom to create a new stream that emits some object that combines the two interval observables that your application can then subscribe to.

var combinedPoll = 
    poll
        .WithLatestFrom(
            lessFrequentPoll, 
            (pollEvent, lessFrequentPollEvent) => 
                new CombinedEvent(pollEvent, lessFrequentEvent)
         );

combinedPoll.Subscribe(o =>
{
    application.UpdateFrequent(o.FrequentEvent);
    application.UpdateLessFrequent(o.LessFrequentEvent);
});

Further reading

Combining sequences

Upvotes: 1

Related Questions