snappymcsnap
snappymcsnap

Reputation: 2103

How to loop Observable Rx until condition met?

I found this excellent answer that is very similar to the problem I'm trying to solve:

Process sometimes hangs while waiting for Exit

My first time working with this System.Reactive package though so I'm struggling with the usage and syntax. I'm trying to modify this block to suit my needs:

     var processExited =
            // Observable will tick when the process has gracefully exited.
            Observable.FromEventPattern<EventArgs>(process, nameof(Process.Exited))
                // First two lines to tick true when the process has gracefully exited and false when it has timed out.
                .Select(_ => true)
                .Timeout(TimeSpan.FromMilliseconds(processTimeOutMilliseconds), Observable.Return(false))
                // Force termination when the process timed out
                .Do(exitedSuccessfully => { if (!exitedSuccessfully) { try { process.Kill(); } catch {} } } );

I would like this to when it times out it checks the stdOut buffer for a particular string, if it finds it then exit, otherwise continue until the next timeout. But also, I only want to do so many timeouts before I 'give up' and kill the process so in the loop, if it doesn't exit, i would increment a counter and check that. Thank you for the help

Adding clarification, I want something like this:

int timeoutCount = 0;
const int maxTimeoutCount =5;

then in the observable, something like .Do(exitedSuccessfully => {

          if (!exitedSuccessfully) {
                        try {
                            if (timeoutCount >= maxTimeOutCount || output.ToString().Contains("string_i-m_looking_for_in_output_to_trigger_exit")) {
                                process.Kill();
                            }
                            timeOutCount++;
                        }
                        catch { }
                    }

Upvotes: 1

Views: 1458

Answers (1)

Asti
Asti

Reputation: 12667

A fundamental design concept of reactive is to avoid external state - and express behavior declaratively. In Rx, you might need to model traditionally mutating state differently.

An idiomatic way to model a process as an observable is to link the lifetime of the process to subscription to the observable. Subscribing starts the process, unsubscribing stops the process, and vice versa, the process exiting normally completes the observable.

I've made an implementation of this model on Github. It's a single file library for C# and F# which abstracts the std in/out of a process as an observable.

Using that model:

 StdioObservable
    .Create("process.exe")
    .TakeUntil(line => line.Contains("Done")) // unsub when you match
    .Timeout(TimeSpan.FromSeconds(30)) // no output for 30 seconds
    .Catch((Exception exn) => Observable.Empty<string>()) //handle timeout or other
    .Subscribe();

If you want to kill the process within a duration, in spite of it producing some output, use .Take(TimeSpan.FromSeconds(30)) instead.

Upvotes: 1

Related Questions