Reputation: 113
I want to translate the following code into a reactive flow (using System.Reactive
in C#).
Method1
, Method2
and Method3
are long running tasks. The result of Method2
is necessary in order to call Method3
, but Method1
and Method2
can run in parallel. If result1 == null
the whole operation can be terminated early with null
.
Usually, Method2
returns faster than Method1
, so Method3
could be started before Method1
finishes.
var result1 = Method1();
if (result1 == null) return null;
var result2 = Method2();
string result3 = null;
if (result2 != null)
{
result3 = Method3(result2);
}
var combinedResult = CreateResult(result1);
if (result2 != null)
{
combinedResult.Attr2 = result2;
}
if (result3 != null)
{
combinedResult.Attr3 = result3;
}
I am lost between nested functions and generics. The following code does not compile, because it has issues with generics and return types (especially the nested Select
does not return a value but an Observable
).
var observable1 = Observable.Start(() => Method1());
var observable2 = Observable.Start(() => Method2());
Observable.Zip(observable1, observable2, (result1, result2) =>
{
if (result2 != null)
{
var observable3 = Observable.Start(() => Method3(result2));
return observable3.Select(result3 =>
{
return SuperCombiner(result1, result2, result3);
};
}
return SuperCombiner(result1, null, null);
};
Upvotes: 1
Views: 111
Reputation: 117027
Here's what you need to get this to work:
var inner =
from m2 in Observable.Start(() => Method2())
from m3 in Observable.Start(() => Method3(m2))
select new { m2, m3 };
var query =
Observable
.Start(() => Method1())
.Publish(m1s =>
m1s
.Zip(
inner.TakeUntil(m1s.Where(m1 => m1 == null)),
(m1, m23) => new { m1, m23.m2, m23.m3 }))
.Where(x => x.m1 != null);
I have tested this using the following code:
public string Method1()
{
Console.WriteLine("Method1 Start");
Thread.Sleep(TimeSpan.FromSeconds(2.0));
Console.WriteLine("Method1 End");
return null; //"1";
}
public string Method2()
{
Console.WriteLine("Method2 Start");
Thread.Sleep(TimeSpan.FromSeconds(3.0));
Console.WriteLine("Method2 End");
return "2";
}
public string Method3(string x)
{
Console.WriteLine("Method3 Start");
Thread.Sleep(TimeSpan.FromSeconds(2.0));
Console.WriteLine("Method3 End");
return $"3-{x}";
}
The query only produces a value when Method1
returns a non-null value - otherwise it completes without producing a value.
Method3
is executed immediately after Method2
is complete unless Method1
has already returned null
in which case Method3
is not executed.
This is computationally the most efficient implementation of what you asked for.
Upvotes: 2
Reputation: 598
I've added some improvements to your draft and now it works as you described:
var stream1 = Observable.Start(Func1);
var stream2 = Observable.Start(Func2);
Observable.Zip(stream1, stream2, (res1, res2) =>
{
if (res1 == null)
return Observable.Start(() => new string[] { null });
if (res2 == null)
return Observable.Start(() => new string[] { res1, null });
return Observable.Start(() => Func3(res2)).Select(res3 => new[] { res1, res2, res3 });
})
.Merge()
.Subscribe(result =>
{
// 'result' is an array
// result[0] - result of Func1
// result[1] - result of Func2
// result[2] - result of Func3
// result.Length == 1 - means that Func1 returned 'null'
// result.Length == 2 - means that Func2 returned 'null'
});
But it's not a real 'Reactive' way because it contains imperative statements (like if
operators for example).
Upvotes: 0