gethomast
gethomast

Reputation: 446

Programming usable and maintainable code

I'm new to reactive programming and came across a problem...

My code is looking like this:

    IsBusy = true;
    service.BeginGetClients(param, c => 
    {
        var r = service.EndGetClients(c);
        if(!CheckResult(r))
        {
            service.BeginGetCachedClients(param, c2 =>
                {
                    var r2 = service.EndGetCachedClients(c2);
                    if(CheckResult(r2))
                        ShowMessage("clients valid");
                    else
                        ShowMessage("clients not valid");

                    UpdateClients(r2);

                    service.BeginUpdateClients(r2, c3 =>
                        {
                            var b = service.EndUpdateClients(c3);
                            if(b)
                                ShowMessage("clients updated");
                            else
                                ShowMessage("clients not updated");
                            IsBusy = false;
                        }, null);

                }, null);
        }
        else
        {
            ShowMessage("error on get clients");
            IsBusy = false;
        }
    }, null);

How can be changed to fluent Rx? I started with this code:

    var invokeClients = Observable.FromAsyncPattern<Param, List<Client>>(service.BeginGetClients, service.EndGetClients);
    var invokeCachedClients = Observable.FromAsyncPattern<Param, List<Client>>(service.BeginGetCachedClients, service.EndGetCachedClients);
    invokeClients(param)
    .Subscribe(r =>
    {
        if(!CheckResult(r))
        {
            invokeCachedClients(param)
            .Subscribe(r2 =>
            {
                // TODO: next op
            });
        }
    });

Any suggestions improving this code? Maybe another solution? I don't like this cascading code...

Thanks!

Upvotes: 2

Views: 181

Answers (3)

Lee Campbell
Lee Campbell

Reputation: 10783

As per the answer above

invokeClients(param)
  .Where(x => !CheckResult(x))
  .SelectMany(invokeCachedClients)
  .Subscribe(x => Console.WriteLine("Do something here"));

which could also be written as

var query = from client in invokeClients(param)
  where !CheckResult(client)
  from cache in invokeCachedClients(client)
  select cache;

then you could have the busy flag wrapped in a resource

Func<IDisposable> busyResource = 
  () =>
  {
    IsBusy = true;
    return Disposable.Create(() => IsBusy = false);
  };

Now you can bring it altogether with the Using factory.

Observable.Using(busyResource, _=>query)
  .Subscribe(x=>Console.Write("Do something here");

The reason you would favor the Using method instead of putting the IsBusy setters int he OnError or OnCompleted is because this will stop it if the subscription is disposed too.

I am sure we could probably do even better than this, but I am finding it hard to understand what it is you are actually doing. To be honest I think this is actually a better fit for TPL or Data Workflows than Rx, ie you are actually chaining work continuations instead of reacting a sequence of events.

Upvotes: 1

Ana Betts
Ana Betts

Reputation: 74692

invokeClients(param)
    .Where(x => !CheckResult(x))
    .Select(invokeCachedClients)
    .Do(_ => IsBusy = true)
    .Merge()
    .Do(_ => IsBusy = false)
    .Subscribe(x => Console.WriteLine("Do something here"));

Make sure to have that subscribe, or else it won't work (it'd be like not Foreach'ing a LINQ query)

Upvotes: 1

Gideon Engelberth
Gideon Engelberth

Reputation: 6155

With things like these, you always want to go inside out. The hardest part about translating this is the parts between the async calls. If you were feeding the result of one right into the next, it would be a straight forward from x in async1() from y in async2(x) .... I see two functions that I would break this into:

public IObserservable<string> UpdateCachedClients(object param)
{
    var getCachedClientsAsync = Observable.FromAsyncPattern<...>(service.BeginGetCachedClients, service.EndGetCachedClients);
    var updateClientsAsync = Observable.FromAsyncPattern<...>(service.BeginUpdateClients, service.EndUpdateClients);

    return Observable.Create<string>(obs =>
    {
        return (from r2 in getCachedClientsAsync(param)
                            .Do(v => 
                                { if (CheckResult(v))
                                      obs.OnNext("clients valid");
                                  else
                                      obs.OnNext("clients not valid");
                                  //huh? is this done twice
                                  UpdateClients(v); 
                                })
                from b in updateClientsAsync(r2)
                select (b ? "clients updated" : "clients not updated")
               ).Subscribe(obs);
    });
}

public IObservable<string> UpdateAllClients(object param)
{
    var getClientsAsync = Observable.FromAsyncPattern<...>(service.BeginGetClients, service.EndGetClients);
    return from r in getClientsAsync(param)
           select (CheckResult(r) ? 
                     Observable.Return("error on get clients") :
                     UpdateCachedClients(param));
}

I used an extra layer of Observable.Create in the first function because it seemed the simplest way to pass a result out and continue with the next call. Once you have these two functions, you should be able to call them like:

IsBusy = true;
var disp = UpdateAllClients(param)
            .Subscribe(ShowMessage,
                       ex => IsBusy = false,
                       () => IsBusy = false);

Note that I've set IsBusy to false in both the OnError and OnCompleted, since either is a terminating message for IObservable.

The TPL seems a more natural fit for async methods like this as they only produce one value, but until the next version of the languages with async/await, you will probably end up with similar syntax to either your method or mine if you use Tasks instead of IObservables.

Upvotes: 0

Related Questions