Reputation: 2475
I am building my first Reactive.NET application and am struggling with how to manage two sets of observables.
To start with, I am polling a server in both sets. I have two "rings" of time (faster and slower): one that fires every second and one that fires every 15 seconds. I noticed that if the server takes a while to process one of the calls in one of the rings, it will immediately "catch up" and continuously fire the events once the call has completed (via timeout or otherwise). So, if I am firing every second and there is a 30 second delay, the subscription will resume by firing 30 times once the delay has completed (assuming there are no other further delays). This is not preferred as I want to keep calls to the server to a minimum, so sticking strictly per frame of time per ring. That is, if there is a 30 second delay, I do not want to bombarded the server with calls once that delay has elapsed.
I have gotten around this with using Sample
. And, actually, everything works well, except now when the application starts, there is a delay on the outer ring and I do not see any output until 15 seconds. With the 1-second ring I got around this by using the StartsWith(-1)
but I cannot seem to get around this with the outer (slower) ring.
Here is my code:
var fast = TimeSpan.FromSeconds(1);
var slow = TimeSpan.FromSeconds(15);
var application = Observable.Interval(fast)
.Sample(fast)
.StartWith(-1)
.Timestamp()
.Window(slow)
.Sample(slow)
.Subscribe(window =>
{
// Outer (slower) ring: ...
window.Subscribe(x =>
{
// Inner (faster) ring: ...
});
});
So, really, the questions are:
EDIT:
Based on @yohanmishkin's answer, here is the code that I am using:
var poll = Observable.Interval(TimeSpan.FromSeconds(1));
var lessFrequentPoll = Observable.Interval(TimeSpan.FromSeconds(15));
poll.Subscribe(o => application.UpdateFrequent(o));
lessFrequentPoll.Subscribe(o => application.UpdateLessFrequent(o));
using (new CompositeDisposable(poll, lessFrequentPoll))
{
// ...
}
As mentioned in the answer there, I actually did use this when I first started Rx, but I was originally thinking (due to my primitive understanding of Rx) that for each subscription I would have to nest the using
contexts and wanted to avoid that. So, if I ended up with 5 "rings" of time, I would have 5 nested using
's and that is not pretty to me. Using the CompositeDisposable
alleviates this.
Upvotes: 1
Views: 108
Reputation: 6026
I find this kind of thing is a bit tricky with Rx. It seems as though the subscribed work shouldn't really have the side-effect of modifying the sequence itself. However, that's what you need, because if the work runs slowly (the service is slow) you need the polling frequency to be reduced.
I think that the various recursive overloads of IScheduler.Schedule()
are designed to support this kind of thing. See http://www.introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html.
The idea is something like this:
private static IObservable<int> PollServer(IScheduler scheduler)
{
return Observable.Create<int>(
observer =>
{
var cts = new CancellationTokenSource();
var token = cts.Token;
// Make initial call immediately
var scheduled = scheduler.Schedule(
TimeSpan.Zero,
async action =>
{
try
{
var res = await CallServer(token);
observer.OnNext(res);
// Trigger another iteration after poll delay
action(TimeSpan.FromSeconds(1));
}
catch (Exception ex)
{
observer.OnError(ex);
}
});
// Discontinue polling when observable is disposed
return () =>
{
cts.Cancel();
scheduled.Dispose();
};
});
}
private static async Task<int> CallServer(CancellationToken token)
{
// Remote service call
await Task.Delay(100);
return 42;
}
This obviously presents only the code for your "slow ring", but hopefully it gives an idea of what you would need.
Upvotes: 0
Reputation: 168
In this case you may want to create two separate observables and subscribe your application to them separately.
Here's an example:
var poll = Observable.Interval(TimeSpan.FromSeconds(1));
var lessFrequentPoll = Observable.Interval(TimeSpan.FromSeconds(15));
poll.Subscribe(o => application.UpdateFrequent(o));
lessFrequentPoll.Subscribe(o => application.UpdateLessFrequent(o));
If you still want to combine the two intervals into a single stream Rx provides plenty of operators for doing so (Merge, Zip, CombineLatest...).
In your case you may want to check out CombineLatest
or WithLatestFrom
. You can see a visualization of how they work here (CombinedLatest) or here (WithLatestFrom).
An example might be using to WithLatestFrom
to create a new stream that emits some object that combines the two interval observables that your application can then subscribe to.
var combinedPoll =
poll
.WithLatestFrom(
lessFrequentPoll,
(pollEvent, lessFrequentPollEvent) =>
new CombinedEvent(pollEvent, lessFrequentEvent)
);
combinedPoll.Subscribe(o =>
{
application.UpdateFrequent(o.FrequentEvent);
application.UpdateLessFrequent(o.LessFrequentEvent);
});
Further reading
Upvotes: 1