Reputation: 651
Sometimes the business logic seems to be able to naturally modeled by some recursive defined observables. Here is one example:
interface Demo {
IObservable<CommandId> userCommands;
IObservable<IObservable<IProcessingState>> processes;
IObservable<CommandId> skippedCommands;
IObservable<(CommandId, CommandResult)> RunCommand(CommandId id);
}
interface IProcessingState {
bool IsProcessing {get;}
CommandId? ProcessingId {get;}
}
For each command user inputs, it should either trigger a running process in prcocess, or emit one value in skippedCommands. Some direct translate of this logic maybe
var validCommands = userCommands.WithLatestFrom(processes).Where(x => !x.Item2.IsProcessing)
var skippedCommands = userCommands.WithLatestFrom(processes).Where(x => x.Item2.IsProcessing)
var processes = validCommands.Select(c => RunCommand(c))
As code above indicated, assign of validCommands
and processes
are mutual recursive, we can equivalently define processes
directly using itself recursively by
var processes = userCommands.WithLatestFrom(processes)
.Where(x => !x.Item2.IsProcessing)
.Select(c => RunCommand(c))
However we can not define prcesses
Observable in C# like this.
I've found several possible related things:
Observable.Generate
constructor. However it seems that it is folding on its own state in a synchronize way, I don't know how to use userCommands
observable and RunCommand
in Observable.Generate
;
Some operator like exhaust
or exhaustMap
in RxJS, while Rx.Net didn't provide this operator, there are serval 3rd-party libraries provided these operators, like FSharp.Control.Reactive.
The implementation is something like
let exhaustMap f source =
Observable.Create (fun (o : IObserver<_>) ->
let mutable hasSubscription = false
let mutable innerSub = None
let onInnerCompleted () =
hasSubscription <- false
innerSub |> Option.iter Disposable.dispose
let onOuterNext x =
if not hasSubscription then
hasSubscription <- true
f x |> subscribeSafeWithCallbacks
o.OnNext o.OnError onInnerCompleted
|> fun y -> innerSub <- Some y
source
|> subscribeSafeWithCallbacks
onOuterNext o.OnError o.OnCompleted)
However, there are two problems. a. directly use this operator does not fit requirements above, skipped commands will be silently ignored. We can modify source code a litter to fit the requirements, but there is still another problem b. the implementation introduced two local mutable variables, and two nested subscription. I don't know if this is ok in all cases (will there be risk of data racing?), and prefer solutions based on composition of operators other than mutable references
SodiumFRP provided forward references types StreamLoop
and CellLoop
. And by the Functional Reactive Programming book, Rx alternative for these forward references types would be Subject
, by using Subject
the recursive construct above is separated into two phases. The problem is by Intro to Rx indicated, using Subject
requires manually manage more state, at least dispose of the subject is required, and maybe forced to hot observables. I'm wondering if there exists solutions without using Subject
Using window
operator with boundary on last value (just before finish) on RunCommand
results, the processes
above can some how be constructed, but this solution need to use ending signal twice, which requires careful treatment (quiet a while on trying and tuning Take(1)
, zip
, withLatestFrom
, combineLatest
, overloads of Window
operators to get desired result) on simultaneous events.
Are there better solutions or modification to solutions above to this problem, especially only use operators?
Upvotes: 0
Views: 96
Reputation: 14350
Your types are all weird and hard to work with. Your question is fundamentally a simple state machine with two triggers: 1) New command arrives, 2) Previous command finishes executing.
This should get you started:
void Main()
{
IObservable<ICommand> source = new Subject<ICommand>();
var executionTerminatedLoopback = new Subject<Unit>();
var stateMachine = source
.Select(c => (command: c, type: 1))
.Merge(executionTerminatedLoopback.Select(_ => (command: (ICommand)null, type: 2)))
.Scan((isExecuting: false, validCommand: (ICommand)null, failedCommand: (ICommand)null), (state, msg) => {
if(msg.type == 2)
return (false, null, null);
if(state.isExecuting)
return (true, null, msg.command);
else
return (true, msg.command, null);
});
var validCommands = stateMachine.Where(t => t.validCommand != null).Select(t => t.validCommand);
var failedCommands = stateMachine.Where(t => t.failedCommand != null).Select(t => t.failedCommand);
validCommands.SelectMany(c => c.Execute()).Subscribe(executionTerminatedLoopback);
}
public interface ICommand{
IObservable<Unit> Execute();
}
source
here doesn't have to be a subject, and probably shouldn't be: It can be any IObservable<ICommand>
. It's just easier to mock up in an answer. The exeuctionTerminatedLoopback
though does have to be, to (as you put it) break up the recursion into two parts. Because it's a subject, it should be kept private and not leaked out.
I don't think there are any answers in C# without using Subject.
Upvotes: 0