Reputation: 1550
Given an observable source, generated by polling the (changes of a) state of a low-level device...
// observable source metacode:
IObservable<DeviceState> source = Observable.Interval(TimeSpan.FromSeconds(0.5))
.Select(tick => new DeviceState(_device.ReadValue()))
.DistinctUntilChanged();
... and a consumer that updates the UI...
// UI metacode:
service.GetObservableDeviceStates()
.Subscribe(state => viewModel.CurrentState = state.ToString());
... I need to execute a custom action after x seconds of source's "inactivity", without interrupting the subscription to source. Something like this:
// UI metacode:
service.GetObservableDeviceStates()
.DoOnTimeout(TimeSpan.FromSeconds(x), () => viewModel.CurrentState = "Idle")
.Subscribe(state => viewModel.CurrentState = state.ToString());
What are the best practices? Possible solutions that come to mind are (I'm a Rx noob):
Returning something special "service-side" when nothing changes (instead of using DistinctUntilChanged) and dealing with it on the UI code:
service.GetObservableDeviceStates() .Subscribe(state => viewModel.CurrentState = state.Special ? "Idle" : state.ToString());
EDIT: as reported in the answer, the solution is:
service.GetObservableDeviceStates()
.Do(onNext)
.Throttle(TimeSpan.FromSeconds(x))
.Subscribe(onTimeout);
EDIT2 (Warning)
If onNext and onTimeout updates UI components, to avoid CrossThreadExceptions two ObserveOn(uiSynchronizationContext) are needed, since Throttle works on another thread!
service.GetObservableDeviceStates()
.ObserveOn(uiSynchronizationContext)
.Do(onNext)
.Throttle(TimeSpan.FromSeconds(x))
.ObserveOn(uiSynchronizationContext)
.Subscribe(onTimeout);
Upvotes: 7
Views: 2324
Reputation: 10783
I personally would avoid the Do method for this. It does make the code in this example fairly easy, but I find once the use of 'Do' sneaks into the code base you soon have spaghetti.
You could also consider using combinations of Amb, Timer, TakeUntil, Throttle etc to get the result you are looking for and still maintaining the Monad*. Or in simple terms, I assume you ideally want to have a sequence of the status values coming through and not require the need to put a timer in your code (i.e. off load it to the service).
public IObservable<DeviceStatus> GetObservableDeviceStates(TimeSpan silencePeriod)
{
return Observable.Create<DeviceStatus>(
o=>
{
var idle = Observable.Timer(silencePeriod).Select(_=>new DeviceStatus("Idle"));
var polledStatus = Observable.Interval(TimeSpan.FromSeconds(0.5))
.Select(tick => new DeviceStatus(_device.ReadValue()))
.DistinctUntilChanged()
.Publish();
var subscription = (from status in polledStatus
from cont in Observable.Return(status).Concat(idle.TakeUntil(polledStatus))
select cont)
.Subscribe(o);
return new CompositeDisposable(subscription, polledStatus.Connect());
});
}
This code now has the Service returning an Idle status value once the specified period of change silence has occurred.
This means your UI meta code stays simple and the logic related to DeviceStatus stays where it belongs
// UI metacode:
service.GetObservableDeviceStates(TimeSpan.FromSeconds(2))
.Subscribe(state => viewModel.CurrentState = state.ToString());
Upvotes: 5
Reputation: 12667
Timeout is more or less meant for observables which represent single asynchronous operations - for e.g., to return a default value or OnError
if said observable hasn't notified you in a certain amount of time.
The operator you're looking for is Throttle, even though it may not seem like it at first. Throttle(p)
gives you a stream which produces a value when the source stream has not produced a value for period p
.
Parallel to your existing code, you can use source.Throttle(period).Do(...side effect)
.
Upvotes: 7