CasqueOuille
CasqueOuille

Reputation: 39

How to specify dependencies between observable sequences?

I'm writing an application which computes data using many observables and there is a dependency between the observables. I have a common observable on which other observables are subscribing. Each time the common observable is refreshed, i want the data to be recomputed. The issue is that since this observable is observed by many others, the data is recomputed many times instead of once.

Here is a unit test to reproduce the issue.

How should modelObservable be declared so that nbComputations = 1 at the end? Should I abandon the observables regarding what i'm trying to do ?

public void Test_Observable_Dependency()
{
    var nbComputations = 0;

    var commonObservable = new Subject<int>();

    var subject1 = new Subject<int>();
    var subject2 = new Subject<int>();
    var subject3 = new Subject<int>();

    var forecastCurves = new List<IObservable<int>>
    {
        subject1.CombineLatest(commonObservable, (a, b) => 0),
        subject2.CombineLatest(commonObservable, (a, b) => 0),
        subject3.CombineLatest(commonObservable, (a, b) => 0),
        commonObservable
    };

    var modelObservable = forecastCurves
        .CombineLatest()
        .CombineLatest(commonObservable, (x, y) =>
        {
            nbComputations++;
            return true;
        })
        .Replay(1).RefCount();
    _ = modelObservable.Subscribe();

    subject1.OnNext(1);
    subject2.OnNext(1);
    subject3.OnNext(1);

    //Before receiving all updates
    nbComputations.Should().Be(0);

    commonObservable.OnNext(1);

    //After receiving all updates
    nbComputations.Should().Be(1);

    nbComputations = 0;
    commonObservable.OnNext(1);

    nbComputations.Should().Be(1);
}

Upvotes: 0

Views: 125

Answers (1)

Magnus
Magnus

Reputation: 363

It might be because of the extensive use of CombineLatest(). The operator ticks every time there is an update from any of the observables.

I've tried to simplify it a bit, but I think the result should be the same. Is it what you need?

public void Test_Observable_Dependency()
{
    var nbComputations = 0; 
    
    var commonObservable = new Subject<int>();
    var subject1 = new Subject<int>();
    var subject2 = new Subject<int>();
    var subject3 = new Subject<int>();
    
    
    var forecastCurves = Observable.CombineLatest(
        subject1,
        subject2,
        subject3,
        commonObservable,
        (s1, s2, s3, common) => new {
            // OP's queries inserted below
            s1 = 0, // (a, b) => 0 
            s2 = 0, // (a, b) => 0 
            s3 = 0, // (a, b) => 0 
            common});
    
    // No need to take WithLatestFrom commonObservable, 
    // because the latest value is available as f.common    
    var modelObservable = forecastCurves.Select(f =>
        {
            
            nbComputations++;
            return true;
        })
        .Replay(1).RefCount();
    _ = modelObservable.Subscribe();

    subject1.OnNext(1);
    subject2.OnNext(1);
    subject3.OnNext(1);

    //Before receiving all updates
    nbComputations.Should().Be(0);

    commonObservable.OnNext(1);

    //After receiving all updates
    nbComputations.Should().Be(1);

    nbComputations = 0;
    commonObservable.OnNext(1);

    nbComputations.Should().Be(1);
}

Edit, taking into account ommitted queries.

Upvotes: 1

Related Questions