Gebb
Gebb

Reputation: 6546

Rx: How to perform an action on inner observable finish with minimum boilerplate?

In an effort to lean Rx I'm writing a simple application which imitates reading a file and supports cancellation.

It works like this:

I'm looking for a way to implement this with minimum glue code while still maintaining the look of a simple sequence of synchronous operations in which the natural control flow is easily seen.

The main problem seems to be hiding the busy indicator when the inner observable (the reader) finishes, the outer observable being the "Run" click event sequence.

So far I came up with these two versions of the main piece of the program:

subscriber-based:

IObservable<byte[]> xs =
   runClick
   .Do((Action<EventPattern<EventArgs>>)(_ => ShowBusy(true)))
   .Do(_ => ShowError(false))
   .Select(_ =>
      reader
      .TakeUntil(cancelClick)
      .Publish(ob =>
      {
         ob.Subscribe(
            b => { },
            ex =>
            {
               Console.WriteLine("ERROR: " + ex);
               ShowBusy(false);
               ShowError(true);
            },
            () => ShowBusy(false));
         return ob;
      }))
   .Switch();
xs = xs.Catch<byte[], Exception>(e => xs);

and concat-based:

IObservable<byte[]> xs =
   runClick
   .Do((Action<EventPattern<EventArgs>>)(_ => ShowBusy(true)))
   .Do(_ => ShowError(false))
   .Select(_ =>
      reader
      .TakeUntil(cancelClick)
      .DoAfter(() =>
         ShowBusy(false)))
   .Switch();
xs = xs.Catch<byte[], Exception>(e =>
{
   Console.WriteLine("ERROR: " + e);
   ShowBusy(false);
   ShowError(true);
   return xs;
});

with one custom method DoAfter

public static IObservable<T> DoAfter<T>(this IObservable<T> observable, Action action)
{
   IObservable<T> appended = observable.Concat(
      Observable.Create<T>(o =>
      {
         try
         {
            action();
         }
         catch (Exception ex)
         {
            o.OnError(ex);
            return Disposable.Empty;
         }

         o.OnCompleted();
         return Disposable.Empty;
      }));
   return appended;
}

(see the whole application code here).

Neither implementation satisfies me completely: the first is rather verbose, the second forces me to write my own method for a rather simple task which I would expect to be covered by standard means. Am I missing something?

Upvotes: 1

Views: 657

Answers (1)

Gebb
Gebb

Reputation: 6546

It turned out what I was missing was an overload of the Do method. I can use it instead of my custom DoAfter method and then the code looks good.

IObservable<byte[]> xs =
   runClick
   .Do((Action<EventPattern<EventArgs>>)(_ => ShowBusy(true)))
   .Do(_ => ShowError(false))
   .Select(_ =>
      reader
      .TakeUntil(cancelClick)
      .Do(
         b => { },
         ex =>
         {
            Console.WriteLine("ERROR: " + ex);
            ShowBusy(false);
            ShowError(true);
         },
         () => ShowBusy(false)
      ))
   .Switch();
xs = xs.Catch<byte[], Exception>(e => xs);

Upvotes: 1

Related Questions