Reputation: 446
I'm new to reactive programming and came across a problem...
My code is looking like this:
IsBusy = true;
service.BeginGetClients(param, c =>
{
var r = service.EndGetClients(c);
if(!CheckResult(r))
{
service.BeginGetCachedClients(param, c2 =>
{
var r2 = service.EndGetCachedClients(c2);
if(CheckResult(r2))
ShowMessage("clients valid");
else
ShowMessage("clients not valid");
UpdateClients(r2);
service.BeginUpdateClients(r2, c3 =>
{
var b = service.EndUpdateClients(c3);
if(b)
ShowMessage("clients updated");
else
ShowMessage("clients not updated");
IsBusy = false;
}, null);
}, null);
}
else
{
ShowMessage("error on get clients");
IsBusy = false;
}
}, null);
How can be changed to fluent Rx? I started with this code:
var invokeClients = Observable.FromAsyncPattern<Param, List<Client>>(service.BeginGetClients, service.EndGetClients);
var invokeCachedClients = Observable.FromAsyncPattern<Param, List<Client>>(service.BeginGetCachedClients, service.EndGetCachedClients);
invokeClients(param)
.Subscribe(r =>
{
if(!CheckResult(r))
{
invokeCachedClients(param)
.Subscribe(r2 =>
{
// TODO: next op
});
}
});
Any suggestions improving this code? Maybe another solution? I don't like this cascading code...
Thanks!
Upvotes: 2
Views: 181
Reputation: 10783
As per the answer above
invokeClients(param)
.Where(x => !CheckResult(x))
.SelectMany(invokeCachedClients)
.Subscribe(x => Console.WriteLine("Do something here"));
which could also be written as
var query = from client in invokeClients(param)
where !CheckResult(client)
from cache in invokeCachedClients(client)
select cache;
then you could have the busy flag wrapped in a resource
Func<IDisposable> busyResource =
() =>
{
IsBusy = true;
return Disposable.Create(() => IsBusy = false);
};
Now you can bring it altogether with the Using factory.
Observable.Using(busyResource, _=>query)
.Subscribe(x=>Console.Write("Do something here");
The reason you would favor the Using method instead of putting the IsBusy setters int he OnError or OnCompleted is because this will stop it if the subscription is disposed too.
I am sure we could probably do even better than this, but I am finding it hard to understand what it is you are actually doing. To be honest I think this is actually a better fit for TPL or Data Workflows than Rx, ie you are actually chaining work continuations instead of reacting a sequence of events.
Upvotes: 1
Reputation: 74692
invokeClients(param)
.Where(x => !CheckResult(x))
.Select(invokeCachedClients)
.Do(_ => IsBusy = true)
.Merge()
.Do(_ => IsBusy = false)
.Subscribe(x => Console.WriteLine("Do something here"));
Make sure to have that subscribe, or else it won't work (it'd be like not Foreach'ing a LINQ query)
Upvotes: 1
Reputation: 6155
With things like these, you always want to go inside out. The hardest part about translating this is the parts between the async calls. If you were feeding the result of one right into the next, it would be a straight forward from x in async1() from y in async2(x) ...
. I see two functions that I would break this into:
public IObserservable<string> UpdateCachedClients(object param)
{
var getCachedClientsAsync = Observable.FromAsyncPattern<...>(service.BeginGetCachedClients, service.EndGetCachedClients);
var updateClientsAsync = Observable.FromAsyncPattern<...>(service.BeginUpdateClients, service.EndUpdateClients);
return Observable.Create<string>(obs =>
{
return (from r2 in getCachedClientsAsync(param)
.Do(v =>
{ if (CheckResult(v))
obs.OnNext("clients valid");
else
obs.OnNext("clients not valid");
//huh? is this done twice
UpdateClients(v);
})
from b in updateClientsAsync(r2)
select (b ? "clients updated" : "clients not updated")
).Subscribe(obs);
});
}
public IObservable<string> UpdateAllClients(object param)
{
var getClientsAsync = Observable.FromAsyncPattern<...>(service.BeginGetClients, service.EndGetClients);
return from r in getClientsAsync(param)
select (CheckResult(r) ?
Observable.Return("error on get clients") :
UpdateCachedClients(param));
}
I used an extra layer of Observable.Create
in the first function because it seemed the simplest way to pass a result out and continue with the next call. Once you have these two functions, you should be able to call them like:
IsBusy = true;
var disp = UpdateAllClients(param)
.Subscribe(ShowMessage,
ex => IsBusy = false,
() => IsBusy = false);
Note that I've set IsBusy
to false in both the OnError and OnCompleted, since either is a terminating message for IObservable.
The TPL seems a more natural fit for async methods like this as they only produce one value, but until the next version of the languages with async/await, you will probably end up with similar syntax to either your method or mine if you use Tasks instead of IObservables.
Upvotes: 0