Reputation: 8624
I have created the below extension where I was able to make an IObservable.Subscribe
awaitable, but wrapping the whole subscription.
A big issue in current async programming in C# and the code below is the onNext
, onError
, onCompleted
that are not awaitable.
Is there a way to work around and have async/await code? Any extension method that I missed;
public static Task Observe<T>(this IObservable<T> observable,
Action<T> onNext,
Func<Exception, EventWaitHandle, ExceptionDispatchInfo> onError,
Action<EventWaitHandle, CancellationToken> onCompleted,
CancellationToken cancellationToken
) where T : class
{
var compositeDisposables = new CompositeDisposable();
var waitHandle = new ManualResetEvent(false);
compositeDisposables.Add(waitHandle);
var disposable = observable.Subscribe(
// This should be await onNext
onNext,
// This should be await onError
e => onError(e, waitHandle),
// This should be await onCompleted
() => onCompleted(waitHandle, cancellationToken));
compositeDisposables.Add(disposable);
waitHandle.WaitOne();
compositeDisposables.Dispose();
return Task.CompletedTask;
}
I understand there's a solution regarding onNext async but doesn't cover the onError and onCompleted.
Upvotes: 1
Views: 403
Reputation: 117027
You can already await an observable. Consider this code:
async Task Main()
{
IObservable<int> observable = Observable.Range(0, 10);
int value = await observable;
Console.WriteLine(value);
}
That produces a value of 9
on the console. The last value of the observable.
If you want the first one, just do this:
async Task Main()
{
IObservable<int> observable = Observable.Range(0, 10);
int value = await observable.Take(1);
Console.WriteLine(value);
}
That produces 0
.
If you want to await all of the values then try this:
async Task Main()
{
IObservable<int> observable = Observable.Range(0, 10);
int[] values = await observable.ToArray();
Console.WriteLine(String.Join(", ", values));
}
That produces 0, 1, 2, 3, 4, 5, 6, 7, 8, 9
.
Be mindful that if your observable doesn't produce a value then an exception is thrown. Or it doesn't end then this effectively is code that never is awaited.
async Task Main()
{
IObservable<int> observable =
from n in Observable.Range(0, 10)
from p in Observable.FromAsync(() => OnNext(n))
select n;
int value = await observable;
Console.WriteLine(value);
}
public async Task OnNext<T>(T value)
{
await Task.Delay(TimeSpan.FromSeconds(1.0));
}
Upvotes: 1
Reputation: 8624
Here's where I ended up, it needs production testing but the unit test is working as expected.
public static Task ObserveAsync<T>(this IObservable<T> observable,
Func<T, Task> onNext,
Func<Exception, Task> onError,
Func<Task> onCompleted,
CancellationToken cancellationToken
) where T : class
{
var compositeDisposables = new CompositeDisposable();
var mainWaitHandle = new ManualResetEvent(false);
compositeDisposables.Add(mainWaitHandle);
var disposable = observable.Subscribe(
value =>
{
var waitHandle = new ManualResetEvent(false);
onNext(value).ContinueWith(antecedent =>
{
if (!antecedent.IsCompletedSuccessfully)
{
waitHandle.Set();
throw new ObserveAsyncException(antecedent.Exception?.Flatten().ToString());
}
waitHandle.Set();
}, cancellationToken);
waitHandle.WaitOne();
},
e =>
{
var waitHandle = new ManualResetEvent(false);
onError(e).ContinueWith(antecedent =>
{
if (!antecedent.IsCompletedSuccessfully)
{
waitHandle.Set();
mainWaitHandle.Set();
throw new ObserveAsyncException(antecedent.Exception?.Flatten().ToString());
}
waitHandle.Set();
mainWaitHandle.Set();
}, cancellationToken);
},
() =>
{
var waitHandle = new ManualResetEvent(false);
onCompleted().ContinueWith(antecedent =>
{
if (!antecedent.IsCompletedSuccessfully)
{
waitHandle.Set();
mainWaitHandle.Set();
throw new ObserveAsyncException(antecedent.Exception?.Flatten().ToString());
}
waitHandle.Set();
mainWaitHandle.Set();
}, cancellationToken);
});
compositeDisposables.Add(disposable);
mainWaitHandle.WaitOne();
compositeDisposables.Dispose();
return Task.CompletedTask;
}
Unit test.
public class ObserveAsyncTests
{
private readonly ITestOutputHelper _testOutputHelper;
public ObserveAsyncTests(ITestOutputHelper testOutputHelper)
{
_testOutputHelper = testOutputHelper;
}
[Fact]
public async Task Observe_Async()
{
// Arrange
var values = new[] { "1", "2", "3" };
var subject = new Subject<string>();
// TimeSpan ts = TimeSpan.FromTicks(1696592212);
// Act
Task.Delay(3000).ContinueWith(_ =>
{
foreach (var value in values)
{
subject.OnNext(value);
}
// subject.OnError(new Exception("Testing"));
subject.OnCompleted();
});
await subject.ObserveAsync(async s =>
{
_testOutputHelper.WriteLine(s);
await Task.Delay(500);
},
async ex =>
{
_testOutputHelper.WriteLine($"Error {ex}");
await Task.Delay(500);
},
async () =>
{
_testOutputHelper.WriteLine("Completed");
// return Task.CompletedTask;
await Task.Delay(500);
},
CancellationToken.None);
// Assert
Assert.True(true);
}
}
Upvotes: 0