Reputation: 1752
My understanding is that async void
, should be avoided and that async () =>
is just async void
in disguise when used with Action
.
Hence, using the Rx.NET Finally operator asynchronously with async () =>
should be avoided since Finally accepts Action
as parameter:
IObservable<T>.Finally(async () =>
{
await SomeCleanUpCodeAsync();
};
However, if this is bad practise, what is then best practice to use in the case where I for instance need to asynchronously close a network connection on OnCompleted or if my observable end with OnError?
Upvotes: 4
Views: 2212
Reputation: 43996
Quoting from the Intro to Rx:
The
Finally
extension method accepts anAction
as a parameter. ThisAction
will be invoked if the sequence terminates normally or erroneously, or if the subscription is disposed of.
(emphasis added)
This behavior cannot be replicated by a Finally
operator that accepts a Func<Task>
parameter, because of how the IObservable<T>
interface is defined. Unsubscribing from an observable sequence is achieved by calling the Dispose
method of the IDisposable
subscription. This method is synchronous. And the whole Rx library is built on top of this interface. So even if you create an extension method DisposeAsync
for IDisposable
s, the built-in Rx operators (for example Select
, SelectMany
, Where
, Take
etc) will be unaware of its existence, and will not invoke it when they unsubscribe from their source sequence. A subscription chain of operators will be automatically unlinked by calling the synchronous Dispose
method of the previous link as always.
Btw there has been an attempt to implement an asynchronous version of Rx (AsyncRx), that is built on top of the completely new interfaces that are shown below. This library has not been released yet.
public interface IAsyncObserver<in T>
{
ValueTask OnNextAsync(T value);
ValueTask OnErrorAsync(Exception error);
ValueTask OnCompletedAsync();
}
public interface IAsyncObservable<out T>
{
ValueTask<IAsyncDisposable> SubscribeAsync(IAsyncObserver<T> observer);
}
public interface IAsyncDisposable
{
public ValueTask DisposeAsync();
}
Upvotes: 2
Reputation: 14370
My understanding is that async void, should be avoided and that
async () =>
is justasync void
in disguise.
This is partially wrong. async () =>
can either match Func<Task>
(good) or Action
(bad). The main reason for good/bad is that an exception that occurs in a async void
call crashes the process, whereas a async Task
exception is catchable.
So we just need to write an AsyncFinally
operator that takes in a Func<Task>
instead of an Action
like Observable.Finally
:
public static class X
{
public static IObservable<T> AsyncFinally<T>(this IObservable<T> source, Func<Task> action)
{
return source
.Materialize()
.SelectMany(async n =>
{
switch (n.Kind)
{
case NotificationKind.OnCompleted:
case NotificationKind.OnError:
await action();
return n;
case NotificationKind.OnNext:
return n;
default:
throw new NotImplementedException();
}
})
.Dematerialize()
;
}
}
And here's a demonstration of usage:
try
{
Observable.Interval(TimeSpan.FromMilliseconds(100))
.Take(10)
.AsyncFinally(async () =>
{
await Task.Delay(1000);
throw new NotImplementedException();
})
.Subscribe(i => Console.WriteLine(i));
}
catch(Exception e)
{
Console.WriteLine("Exception caught, no problem");
}
If you swap out AsyncFinally
for Finally
, you'll crash the process.
Upvotes: 3
Reputation: 12687
The method signature for Finally
is
public static IObservable<TSource> Finally<TSource>(
this IObservable<TSource> source,
Action finallyAction
)
which expects an action, not a Task.
As an addendum, if you want to run something asynchronously, instead of async void, use Task.Factory
methods inside the method so the intention is explicit.
Upvotes: 2
Reputation: 3904
It is in Rx as it is elsewhere; avoid async void
like the plague. In addition to the problems listed in the article, using asynchronous code in the synchronous operators "breaks" Rx.
I'd consider using OnErrorResumeNext()
for cleaning up resources asynchronously. OnErrorResumeNext()
let's you specify an observable which will run after the first, regardless the reason it ended:
var myObservable = ...
myObservable
.Subscribe( /* Business as usual */ );
Observable.OnErrorResumeNext(
myObservable.Select(_ => Unit.Default),
Observable.FromAsync(() => SomeCleanUpCodeAsync()))
.Subscribe();
myObservable
would preferably be a ConnectableObservable
(e.g. Publish()
) to prevent multiple subscriptions.
Upvotes: 2