Reputation: 6096
I have a method returning an IObservable<long>
that I call from an async
method. I want to convert this into a normal List<long>
, but to have that operation be cancelled if my CancellationToken
is signaled.
I would like to do something like this:
List<long> result = await Foo().ToList(myCancellationToken);
What is the correct (and simplest) way to accomplish this? The ToList()
extension method of IObservable<T>
returns an IObservable<List<T>>
and does not take a CancellationToken
parameter.
Upvotes: 4
Views: 2789
Reputation: 32182
Use TakeWhile to terminate the list.
CancellationToken MyToken = ...
var list = await Foo().TakeWhile(v=>!MyToken.IsCancellationRequested).ToList();
If you are worried about the subscription only cancelling when the next item is provided you can have this extensions method.
public static IObservable<T>
TakeWhile<T>
( this IObservable<T> This
, CancellationToken t
)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(t);
return Observable.Create<T>(( IObserver<T> observer ) =>
{
This.Subscribe(observer, cts.Token);
return Disposable.Create(() => cts.Cancel());
});
}
and write
CancellationToken MyToken = ...
var list = await Foo().TakeWhile(MyToken.IsCancellationRequested).ToList();
Using TakeWhile with a cancellation token is more composable than ToTask which just returns the last element.
Upvotes: 4
Reputation: 39192
var list = await Foo().ToList().ToTask(cancellationToken);
This has the advantage of cancelling immediately if the token is cancelled (the other Answer will not cancel until the next time the Foo
observable produces a value).
Upvotes: 7