Reputation: 2222
I need to execute multiple long-running operations in parallel and would like to report a progress in some way. From my initial research it seems that IObservable fits into this model. The idea is that I call a method that return IObservable of int where int is reported percent complete, parallel execution starts immediately upon exiting a method, this observable must be a hot observable so that all subscribers learn the same progress information at specific point in time, e.g. late subscriber may only learn that the whole execution is complete and there is no more progress to track.
The closest approach to this problem that I found is to use Observable.ForkJoin and Observable.Start, but I can't come to understanding how to make them a single observable that I can return from a method.
Please share your ideas of how can it be achieved or maybe there is another approach to this problem using .Net RX.
Upvotes: 3
Views: 1815
Reputation: 6155
To make a hot observable, I would probably start with a method that uses a BehaviorSubject
as the return value and the way the operations report progress. If you just want the example, skip to the end. The rest of this answer explains the steps.
I will assume for the sake of this answer that your long-running operations do not have their own way to be called asynchronously. If they do, the next step may be a little different. The next thing to do is to send the work to another thread using an IScheduler
. You may allow the caller to select where the work happens by making an overload that takes the scheduler as a parameter if desired (in which case the overload that does not will pick a default scheduler). There are quite a few overloads of IScheduler.Scheduler
, of which several are extensions methods, so you should look through them to see which is most appropriate for your situation; I'm using the on that takes only an Action
here. If you have multiple operations that can all run in parallel, you can call scheduler.Schedule
multiple times.
The hardest part of this will probably be determining what the progress is at any given point. If you have multiple operations going on at once, you will probably need to keep track of how many have completed to know what the current progress is. With the information you provided, I can't be more specific than that.
Finally, if your operations are cancellable, you may want to take a CancellationToken
as a parameter. You can use this to cancel the operation while it is in the scheduler's queue before it starts. If you write your operation code correctly, it can use the token for cancellation as well.
IObservable<int> DoStuff(/*args*/,
CancellationToken cancel,
IScheduler scheduler)
{
BehaviorSubject<int> progress;
//if you don't take it as a parameter, pick a scheduler
//IScheduler scheduler = Scheduler.ThreadPool;
var disp = scheduler.Schedule(() =>
{
//do stuff that needs to run on another thread
//report progres
porgress.OnNext(25);
});
var disp2 = scheduler.Schedule(...);
//if the operation is cancelled before the scheduler has started it,
//you need to dispose the return from the Schedule calls
var allOps = new CompositeDisposable(disp, disp2);
cancel.Register(allOps.Dispose);
return progress;
}
Upvotes: 3
Reputation: 19117
Here is one approach
// setup a method to do some work,
// and report it's own partial progress
Func<string, IObservable<int>> doPartialWork =
(arg) => Observable.Create<int>(obsvr => {
return Scheduler.TaskPool.Schedule(arg,(sched,state) => {
var progress = 0;
var cancel = new BooleanDisposable();
while(progress < 10 && !cancel.IsDisposed)
{
// do work with arg
Thread.Sleep(550);
obsvr.OnNext(1); //report progress
progress++;
}
obsvr.OnCompleted();
return cancel;
});
});
var myArgs = new[]{"Arg1", "Arg2", "Arg3"};
// run all the partial bits of work
// use SelectMany to get a flat stream of
// partial progress notifications
var xsOfPartialProgress =
myArgs.ToObservable(Scheduler.NewThread)
.SelectMany(arg => doPartialWork(arg))
.Replay().RefCount();
// use Scan to get a running aggreggation of progress
var xsProgress = xsOfPartialProgress
.Scan(0d, (prog,nextPartial)
=> prog + (nextPartial/(myArgs.Length*10d)));
Upvotes: 0