Reputation: 5899
I have a class that has a couple of bool properties and is subscribed to an observable that supplies objects that contain various values (at an indeterminate pace). For example:
bool IsActive {get; private set;}
bool IsBroken {get; private set;}
bool Status {get; private set;}
...
valueStream.GetValues().Subscribe(UpdateValues);
UpdateValues does some work based on the object passed in. There is one value in particular that I am interested in using for some specific logic though. Let's call it obj.SpecialValue:
private void UpdateValues(MyObject obj)
{
...
Status = obj.SpecialValue;
...
}
Now, if IsActive is set to true and Status is false, I want to wait three seconds before setting IsBroken to true, to give the stream a chance to return an obj.SpecialValue of true in that time. If within three seconds it does return true, I just do nothing, otherwise set IsBroken to true. When either Status or IsActive is updated, check again.
At first I had:
Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(3)).Subscribe(SetIsBroken);
private void SetIsBroken()
{
IsBroken = IsActive && !Status;
}
but that does more checking than it needs to. It only really needs to check when the stream updates or IsActive is changed.
Any tips on how to do this the right way?
Upvotes: 3
Views: 1792
Reputation: 29806
BehavourSubject<T>
to back propertiesA useful idea for this problem is to back your properties with BehaviorSubject<bool>
types. These usefully serve the dual purpose of active both as a property and a stream of values of that property.
You can subscribe to them as observables, but also access their current value through the Value
property. You change them by sending a new value via OnNext
.
For example, we could do this:
private BehaviorSubject<bool> _isActive;
public bool IsActive
{
get { return _isActive.Value; }
set { _isActive.OnNext(value); }
}
With this in place across all your properties, it becomes a fairly simply exercise to watch the properties for the complex condition you state. Assuming _status
and _isBroken
are similarly implemented backing subjects we can set up a subscription like this:
Observable.CombineLatest(_isActive,
_status,
(a,s) => a & !s).DistinctUntilChanged()
.Where(p => p)
.SelectMany(_ => Observable.Timer(TimeSpan.FromSeconds(3), scheduler)
.TakeUntil(_status.Where(st => st)))
.Subscribe(_ => _isBroken.OnNext(true));
The part line uses CombineLatest
and subscribes to the _isActive
and _status
streams. It emits whenever either of these change - and the result function sets a true value precisely when _isActive
is true and _status
is false. The DistinctUntilChanged()
prevents the setting of _isActive
and _status
to the values they already have from starting a new timer.
Then we use Where
to filter for this condition only.
The SelectMany
is now going to take true values and project each one into a stream that emits after 3 seconds, using Timer
- but we use TakeUntil
to squash this value in the event that _status
becomes true. SelectMany
also flattens the stream of streams back down to a single boolean stream.
Not sure here - you didn't mention it, but you may want to think about whether _isActive
going false should also terminate the timer. If this is the case, you can use Merge
to combine a watch for this and _status in the TakeUntil
.
We can subscribe to this whole thing now to set _isBroken
true should this query ever fire, indicating the timer expired.
Note the scheduler
argument to Timer
- this exists so we can pass in a test scheduler.
I'm not sure if I've captured all your logic correctly - but if not hopefully you can see how to amend as necessary.
Here is the complete example. Use nuget package rx-testing
and this will run in LINQPad as written:
void Main()
{
var tests = new Tests();
tests.Test();
}
public class Foo
{
private BehaviorSubject<bool> _isActive;
private BehaviorSubject<bool> _isBroken;
private BehaviorSubject<bool> _status;
public bool IsActive
{
get { return _isActive.Value; }
set { _isActive.OnNext(value); }
}
public bool IsBroken { get { return _isBroken.Value; } }
public bool Status { get { return _status.Value; } }
public Foo(IObservable<MyObject> valueStream, IScheduler scheduler)
{
_isActive = new BehaviorSubject<bool>(false);
_isBroken = new BehaviorSubject<bool>(false);
_status = new BehaviorSubject<bool>(false);
// for debugging purposes
_isActive.Subscribe(a => Console.WriteLine(
"Time: " + scheduler.Now.TimeOfDay + " IsActive: " + a));
_isBroken.Subscribe(a => Console.WriteLine(
"Time: " + scheduler.Now.TimeOfDay + " IsBroken: " + a));
_status.Subscribe(a => Console.WriteLine(
"Time: " + scheduler.Now.TimeOfDay + " Status: " + a));
valueStream.Subscribe(UpdateValues);
Observable.CombineLatest(
_isActive,
_status,
(a,s) => a & !s).DistinctUntilChanged()
.Where(p => p)
.SelectMany(_ => Observable.Timer(TimeSpan.FromSeconds(3),
scheduler)
.TakeUntil(_status.Where(st => st)))
.Subscribe(_ => _isBroken.OnNext(true));
}
private void UpdateValues(MyObject obj)
{
_status.OnNext(obj.SpecialValue);
}
}
public class MyObject
{
public MyObject(bool specialValue)
{
SpecialValue = specialValue;
}
public bool SpecialValue { get; set; }
}
public class Tests : ReactiveTest
{
public void Test()
{
var testScheduler = new TestScheduler();
var statusStream = testScheduler.CreateColdObservable<bool>(
OnNext(TimeSpan.FromSeconds(1).Ticks, false),
OnNext(TimeSpan.FromSeconds(3).Ticks, true),
OnNext(TimeSpan.FromSeconds(5).Ticks, false));
var activeStream = testScheduler.CreateColdObservable<bool>(
OnNext(TimeSpan.FromSeconds(1).Ticks, false),
OnNext(TimeSpan.FromSeconds(6).Ticks, true));
var foo = new Foo(statusStream.Select(b => new MyObject(b)), testScheduler);
activeStream.Subscribe(b => foo.IsActive = b);
testScheduler.Start();
}
}
If you want isActive false to set isBroken false, then I think this adds up to now saying the following:
isActive isStatus Action
T F Set Broken True after 3 seconds unless any other result occurs
T T Set Broken False immediately if not already false, cancel timer
F F Set Broken False immediately if not already false, cancel timer
F T Set Broken False immediately if not already false, cancel timer
In that case, use the following query:
Observable.CombineLatest(
_isActive,
_status,
(a,s) => a & !s).DistinctUntilChanged()
.Select(p => p ? Observable.Timer(TimeSpan.FromSeconds(3),
scheduler)
.Select(_ => true)
: Observable.Return(false))
.Switch()
.DistinctUntilChanged()
.Subscribe(res => _isBroken.OnNext(res));
Note the changes:
SelectMany
is now a Select
that turns each event into either
true
after 3 secondsfalse
Select
is a stream of bool streams: IObservable<IObservable<bool>>
. We want any new stream appearing to cancel any previous stream. This is what the Switch
will do - flattening the result in the process.DistinctUntilChanged()
since a cancelled timer could cause two false values to appear in the stream consecutivelyisBroken
.Upvotes: 1