Reputation: 35631
I have an user interaction scenario I'd like to handle with Rx.
The scenario is similar to the canonical "when user stops typing, do some work" (usually, search for what the user has typed so far) (1) - but I also need to :
For (1) I use an IObservable
for the user events, throttled with .Throttle()
to only trigger on pauses between events ("user stops typing").
From that, i .Select(_ => CreateMyTask(...).ToObservable())
.
This gives me an IObservable<IObservable<T>>
where each of the inner observables wraps a single task.
To get (2) I finally apply .Switch()
to only get the results from the newest unit of work.
What about (3) - cancel pending tasks ?
If I understand correctly, whenever there's a new inner IObservable<T>
, the .Switch()
method subscribes to it and unsubscribes from the previous one(s), causing them to Dispose()
.
Maybe that can be somehow wired to trigger the task to cancel?
Upvotes: 7
Views: 2988
Reputation: 39182
You can just use Observable.FromAsync
which will generate tokens that are cancelled when the observer unsubcribes:
input.Throttle(...)
.Select(_ => Observable.FromAsync(token => CreateMyTask(..., token)))
.Switch()
.Subscribe(...);
This will generate a new token for each unit of work and cancel it every time Switch
switches to the new one.
Upvotes: 14
Reputation: 117027
Do you have to work with Tasks?
If you're happy to work purely with Observables then you can do this nicely yourself.
Try doing something like this:
var query =
Observable.Create<int>(o =>
{
var cancelling = false;
var cancel = Disposable.Create(() =>
{
cancelling = true;
});
var subscription = Observable.Start(() =>
{
for (var i = 0; i < 100; i++)
{
Thread.Sleep(10); //1000 ms in total
if (cancelling)
{
Console.WriteLine("Cancelled on {0}", i);
return -1;
}
}
Console.WriteLine("Done");
return 42;
}).Subscribe(o);
return new CompositeDisposable(cancel, subscription);
});
This observable is doing some hard work in the for loop with the Thread.Sleep(10);
, but when the observable is disposed the loop is exited and the intensive CPU work ceases. Then you can use the standard Rx Dispose
with the Switch
to cancel the in progress work.
If you'd like that bundled up in a method, then try this:
public static IObservable<T> Start<T>(Func<Func<bool>, T> work)
{
return Observable.Create<T>(o =>
{
var cancelling = false;
var cancel = Disposable
.Create(() => cancelling = true);
var subscription = Observable
.Start(() => work(() => cancelling))
.Subscribe(o);
return new CompositeDisposable(cancel, subscription);
});
}
And then call it with a function like this:
Func<Func<bool>, int> work = cancelling =>
{
for (var i = 0; i < 100; i++)
{
Thread.Sleep(10); //1000 ms in total
if (cancelling())
{
Console.WriteLine("Cancelled on {0}", i);
return -1;
}
}
Console.WriteLine("Done");
return 42;
};
Here's my code that proved this worked:
var disposable =
ObservableEx
.Start(work)
.Subscribe(x => Console.WriteLine(x));
Thread.Sleep(500);
disposable.Dispose();
I got "Cancelled on 50" (sometime "Cancelled on 51") as my output.
Upvotes: 3