Reputation: 39
I'm writing an application which computes data using many observables and there is a dependency between the observables. I have a common observable on which other observables are subscribing. Each time the common observable is refreshed, i want the data to be recomputed. The issue is that since this observable is observed by many others, the data is recomputed many times instead of once.
Here is a unit test to reproduce the issue.
How should modelObservable
be declared so that nbComputations = 1
at the end?
Should I abandon the observables regarding what i'm trying to do ?
public void Test_Observable_Dependency()
{
var nbComputations = 0;
var commonObservable = new Subject<int>();
var subject1 = new Subject<int>();
var subject2 = new Subject<int>();
var subject3 = new Subject<int>();
var forecastCurves = new List<IObservable<int>>
{
subject1.CombineLatest(commonObservable, (a, b) => 0),
subject2.CombineLatest(commonObservable, (a, b) => 0),
subject3.CombineLatest(commonObservable, (a, b) => 0),
commonObservable
};
var modelObservable = forecastCurves
.CombineLatest()
.CombineLatest(commonObservable, (x, y) =>
{
nbComputations++;
return true;
})
.Replay(1).RefCount();
_ = modelObservable.Subscribe();
subject1.OnNext(1);
subject2.OnNext(1);
subject3.OnNext(1);
//Before receiving all updates
nbComputations.Should().Be(0);
commonObservable.OnNext(1);
//After receiving all updates
nbComputations.Should().Be(1);
nbComputations = 0;
commonObservable.OnNext(1);
nbComputations.Should().Be(1);
}
Upvotes: 0
Views: 125
Reputation: 363
It might be because of the extensive use of CombineLatest()
. The operator ticks every time there is an update from any of the observables.
I've tried to simplify it a bit, but I think the result should be the same. Is it what you need?
public void Test_Observable_Dependency()
{
var nbComputations = 0;
var commonObservable = new Subject<int>();
var subject1 = new Subject<int>();
var subject2 = new Subject<int>();
var subject3 = new Subject<int>();
var forecastCurves = Observable.CombineLatest(
subject1,
subject2,
subject3,
commonObservable,
(s1, s2, s3, common) => new {
// OP's queries inserted below
s1 = 0, // (a, b) => 0
s2 = 0, // (a, b) => 0
s3 = 0, // (a, b) => 0
common});
// No need to take WithLatestFrom commonObservable,
// because the latest value is available as f.common
var modelObservable = forecastCurves.Select(f =>
{
nbComputations++;
return true;
})
.Replay(1).RefCount();
_ = modelObservable.Subscribe();
subject1.OnNext(1);
subject2.OnNext(1);
subject3.OnNext(1);
//Before receiving all updates
nbComputations.Should().Be(0);
commonObservable.OnNext(1);
//After receiving all updates
nbComputations.Should().Be(1);
nbComputations = 0;
commonObservable.OnNext(1);
nbComputations.Should().Be(1);
}
Edit, taking into account ommitted queries.
Upvotes: 1