just part of the crowd
just part of the crowd

Reputation: 227

Correct way of long running API calls in RX.net and WPF

I've been happily making some API calls in a WPF app using RX in the following manner:

    IDisposable disposable = _textFromEventPatternStream
        .ObserveOn(_rxConcurrencyService.Dispatcher)
        .Subscribe(async input =>
            {
                try
                {
                    IsLoading = true;
                    int x = int.Parse(input);
                    var y = await _mathApi.CalcAsync(x);
                    IsLoading = false;
                    Model.Update("", y);
                }
                catch (Exception ex)
                {
                    Model.Update(ex.Message, "Error caught in subscribe, stream continues...");
                }
                finally
                {
                    IsLoading = false;
                }
            },
            ex => Model.Update(ex.Message, "Error, stream will end..."));

However for various reasons, i think I may need to make the calls using the SelectMany operator and do some processing on the stream.

I expect that within the api calls there may be some errors. For example the API endpoint may not be available. Some of the parsing before the API call fail. Etc. I want the Hot Observable to continue. I need to display a standard IsLoading spinner as well.

Now I also understand, that once on OnError is received the sequence should not continue. I understand this... I just don't like it.

With, that, the question is: Is using Retry() the correct method of achieving a hot observable that continues to operate regardless of errors?

The below rewritten code works, but feels yucky:

    IDisposable disposable = _textFromEventPatternStream
        .Select(input => int.Parse(input)) // simulating much heavier pre processing, leading to a possible error
        .ObserveOn(_rxConcurrencyService.Dispatcher)
        .Do(_ => IsLoading = true)
        .ObserveOn(_rxConcurrencyService.TaskPool)
        .SelectMany(inputInt => _mathApi.CalcAsync(inputInt))
        .ObserveOn(_rxConcurrencyService.Dispatcher)
        .Do(s => { },
            ex =>
            {
                // this feels like a hack.
                Model.Update(ex.Message, "Error, stream will retry...");
                IsLoading = false;
            })
        .Retry()
        .Subscribe(x => Model.Update("", x),
            ex => Model.Update(ex.Message, "Error, stream will end..."));

I have seen some code examples, where people use nested streams to resubscribe to a faulted stream. From what I've read this seems like a common approach, but to me this it seems to turn what should be a simple scenario into a hard to follow situation.

Upvotes: 4

Views: 719

Answers (2)

just part of the crowd
just part of the crowd

Reputation: 227

Following up, this is what we ended up using in case anyone else has a similar scenario.

We removed the Retry() operator, as that appears to be best suited to Cold Observables.

So, given the original intention, we want the stream to continue regardless of errors (we expect networks connectivity probs etc), we are recursivelly invoking the method that does the subscribe in the exception handler, after we log the error, so that the logs pick up the exception.

It appears to work well, and has passed our various scenarios including db server down, api timeout exception, etc...

Feel free to comment if there are any obvious flaws in the approach...

 public void ObserveSelectedTemplateStream(IObservable<dto> textFromEventPatternStream)
            {

                _compositeDisposable.Dispose(); // clean whatever subscriptions we have
                _compositeDisposable = new CompositeDisposable();

                var disposable = textFromEventPatternStream
                .Select(x => Parse(x)) // simulating much heavier pre processing, leading to a possible error
                .ObserveOn(_rxConcurrencyService.Dispatcher)
                .Do(_ => IsLoading = true)
                .ObserveOn(_rxConcurrencyService.TaskPool)
                .SelectMany(x=> _mathApi.CalcAsync(x))
                .ObserveOn(_rxConcurrencyService.Dispatcher)                    .Subscribe(Model.Update,
                    ex => {
                        HandleException(ex);
                        ObserveSelectedTemplateStream(textFromEventPatternStream); // Recursively resubscribe to our stream. We expect errors. It's an API.
                    });

            _compositeDisposable.Add(disposable);


        }

Upvotes: 1

Enigmativity
Enigmativity

Reputation: 117175

If it's the CalcAsync that could throw an error, I'd try this instead:

.SelectMany(inputInt => Observable.FromAsync(() => _mathApi.CalcAsync(inputInt)).Retry())

Put the retry as close to the faulting observable as possible.

I'd also suggest some sort of retry count so that a a perpetual error doesn't just hang the observable.

Here's a sample that shows that this works.

This fails:

void Main()
{
    var subject = new Subject<string>();

    IDisposable disposable =
        subject
            .Select(input => int.Parse(input))
            .SelectMany(inputInt => Observable.FromAsync(() => CalcAsync(inputInt)))
            .Subscribe(x => Console.WriteLine(x));

    subject.OnNext("1");
    subject.OnNext("2");
    subject.OnNext("3");
    subject.OnNext("4");
    subject.OnNext("5");
    subject.OnNext("6");
    subject.OnCompleted();
}

private int _counter = 0;

public async Task<int> CalcAsync(int x)
{
    if (_counter++ == 3)
    {
        throw new Exception();
    }
    return await Task.Factory.StartNew(() => -x);
}

It typically outputs:

-1
-2
-3
Exception of type 'System.Exception' was thrown. 

Change the SelectMany to:

.SelectMany(inputInt => Observable.FromAsync(() => CalcAsync(inputInt)).Retry())

Now I get:

-1
-3
-2
-4
-5
-6

Upvotes: 2

Related Questions