mre
mre

Reputation: 113

Chaining observables and passing result values

I want to translate the following code into a reactive flow (using System.Reactive in C#).

Method1, Method2 and Method3 are long running tasks. The result of Method2 is necessary in order to call Method3, but Method1 and Method2 can run in parallel. If result1 == null the whole operation can be terminated early with null.

Usually, Method2 returns faster than Method1, so Method3 could be started before Method1 finishes.

var result1 = Method1();
if (result1 == null) return null;

var result2 = Method2();

string result3 = null;
if (result2 != null)
{
    result3 = Method3(result2);
}

var combinedResult = CreateResult(result1);
if (result2 != null)
{
    combinedResult.Attr2 = result2;
}
if (result3 != null)
{
    combinedResult.Attr3 = result3;
}

I am lost between nested functions and generics. The following code does not compile, because it has issues with generics and return types (especially the nested Select does not return a value but an Observable).

var observable1 = Observable.Start(() => Method1());
var observable2 = Observable.Start(() => Method2());
Observable.Zip(observable1, observable2, (result1, result2) =>
{
    if (result2 != null)
    {
        var observable3 = Observable.Start(() => Method3(result2));
        return observable3.Select(result3 => 
        {
            return SuperCombiner(result1, result2, result3);
        };
    }
    return SuperCombiner(result1, null, null);
};

Upvotes: 1

Views: 111

Answers (2)

Enigmativity
Enigmativity

Reputation: 117027

Here's what you need to get this to work:

var inner =
    from m2 in Observable.Start(() => Method2())
    from m3 in Observable.Start(() => Method3(m2))
    select new { m2, m3 };

var query =
    Observable
        .Start(() => Method1())
        .Publish(m1s =>
            m1s
                .Zip(
                    inner.TakeUntil(m1s.Where(m1 => m1 == null)),
                    (m1, m23) => new { m1, m23.m2, m23.m3 }))
        .Where(x => x.m1 != null);

I have tested this using the following code:

public string Method1()
{
    Console.WriteLine("Method1 Start");
    Thread.Sleep(TimeSpan.FromSeconds(2.0));
    Console.WriteLine("Method1 End");
    return null; //"1";
}


public string Method2()
{
    Console.WriteLine("Method2 Start");
    Thread.Sleep(TimeSpan.FromSeconds(3.0));
    Console.WriteLine("Method2 End");
    return "2";
}

public string Method3(string x)
{
    Console.WriteLine("Method3 Start");
    Thread.Sleep(TimeSpan.FromSeconds(2.0));
    Console.WriteLine("Method3 End");
    return $"3-{x}";
}

The query only produces a value when Method1 returns a non-null value - otherwise it completes without producing a value.

Method3 is executed immediately after Method2 is complete unless Method1 has already returned null in which case Method3 is not executed.

This is computationally the most efficient implementation of what you asked for.

Upvotes: 2

Vitalii Ilchenko
Vitalii Ilchenko

Reputation: 598

I've added some improvements to your draft and now it works as you described:

var stream1 = Observable.Start(Func1);
var stream2 = Observable.Start(Func2);

Observable.Zip(stream1, stream2, (res1, res2) =>
{
    if (res1 == null)
        return Observable.Start(() => new string[] { null });

    if (res2 == null)
        return Observable.Start(() => new string[] { res1, null });

    return Observable.Start(() => Func3(res2)).Select(res3 => new[] { res1, res2, res3 });
})
.Merge()
.Subscribe(result =>
{
    // 'result' is an array

    // result[0] - result of Func1
    // result[1] - result of Func2
    // result[2] - result of Func3

    // result.Length == 1  - means that Func1 returned 'null'
    // result.Length == 2  - means that Func2 returned 'null'
});

But it's not a real 'Reactive' way because it contains imperative statements (like if operators for example).

Upvotes: 0

Related Questions