xplat
xplat

Reputation: 8624

How can I make async await the Reactive Extensions onNext, onError, onCompleted delegates

I have created the below extension where I was able to make an IObservable.Subscribe awaitable, but wrapping the whole subscription.

A big issue in current async programming in C# and the code below is the onNext, onError, onCompleted that are not awaitable.

Is there a way to work around and have async/await code? Any extension method that I missed;

public static Task Observe<T>(this IObservable<T> observable,
        Action<T> onNext,
        Func<Exception, EventWaitHandle, ExceptionDispatchInfo> onError,
        Action<EventWaitHandle, CancellationToken> onCompleted,
        CancellationToken cancellationToken
    ) where T : class
    {
        var compositeDisposables = new CompositeDisposable();
        var waitHandle = new ManualResetEvent(false);
        compositeDisposables.Add(waitHandle);

        var disposable = observable.Subscribe(
            // This should be await onNext
            onNext, 
            // This should be await onError
            e => onError(e, waitHandle),
            // This should be await onCompleted
            () => onCompleted(waitHandle, cancellationToken));
        
        compositeDisposables.Add(disposable);

        waitHandle.WaitOne();
        compositeDisposables.Dispose();

        return Task.CompletedTask;
    }

I understand there's a solution regarding onNext async but doesn't cover the onError and onCompleted.

Upvotes: 1

Views: 403

Answers (2)

Enigmativity
Enigmativity

Reputation: 117027

You can already await an observable. Consider this code:

async Task Main()
{
    IObservable<int> observable = Observable.Range(0, 10);
    
    int value = await observable;
    
    Console.WriteLine(value);
}

That produces a value of 9 on the console. The last value of the observable.

If you want the first one, just do this:

async Task Main()
{
    IObservable<int> observable = Observable.Range(0, 10);
    
    int value = await observable.Take(1);
    
    Console.WriteLine(value);
}

That produces 0.

If you want to await all of the values then try this:

async Task Main()
{
    IObservable<int> observable = Observable.Range(0, 10);
    
    int[] values = await observable.ToArray();
    
    Console.WriteLine(String.Join(", ", values));
}

That produces 0, 1, 2, 3, 4, 5, 6, 7, 8, 9.

Be mindful that if your observable doesn't produce a value then an exception is thrown. Or it doesn't end then this effectively is code that never is awaited.


async Task Main()
{
    IObservable<int> observable =
    from n in Observable.Range(0, 10)
    from p in Observable.FromAsync(() => OnNext(n))
    select n;

    int value = await observable;

    Console.WriteLine(value);
}

public async Task OnNext<T>(T value)
{
    await Task.Delay(TimeSpan.FromSeconds(1.0));
}

Upvotes: 1

xplat
xplat

Reputation: 8624

Here's where I ended up, it needs production testing but the unit test is working as expected.

public static Task ObserveAsync<T>(this IObservable<T> observable,
        Func<T, Task> onNext,
        Func<Exception, Task> onError,
        Func<Task> onCompleted,
        CancellationToken cancellationToken
    ) where T : class
    {
        var compositeDisposables = new CompositeDisposable();
        var mainWaitHandle = new ManualResetEvent(false);
        compositeDisposables.Add(mainWaitHandle);
        
        var disposable = observable.Subscribe(
            value =>
            {
                var waitHandle = new ManualResetEvent(false);
                onNext(value).ContinueWith(antecedent =>
                {
                    if (!antecedent.IsCompletedSuccessfully)
                    {
                        waitHandle.Set();
                        throw new ObserveAsyncException(antecedent.Exception?.Flatten().ToString());
                    }

                    waitHandle.Set();
                }, cancellationToken);
                waitHandle.WaitOne();
            },
            e =>
            {
                var waitHandle = new ManualResetEvent(false);
                onError(e).ContinueWith(antecedent =>
                {
                    if (!antecedent.IsCompletedSuccessfully)
                    {
                        waitHandle.Set();
                        mainWaitHandle.Set();
                        throw new ObserveAsyncException(antecedent.Exception?.Flatten().ToString());
                    }
                    waitHandle.Set();
                    mainWaitHandle.Set();
                }, cancellationToken);
            },
            () =>
            {
                var waitHandle = new ManualResetEvent(false);
                onCompleted().ContinueWith(antecedent =>
                {
                    if (!antecedent.IsCompletedSuccessfully)
                    {
                        waitHandle.Set();
                        mainWaitHandle.Set();
                        throw new ObserveAsyncException(antecedent.Exception?.Flatten().ToString());
                    }
                    waitHandle.Set();
                    mainWaitHandle.Set();
                }, cancellationToken);
            });
        
        compositeDisposables.Add(disposable);

        mainWaitHandle.WaitOne();
        compositeDisposables.Dispose();

        return Task.CompletedTask;
    }

Unit test.

public class ObserveAsyncTests
{
    private readonly ITestOutputHelper _testOutputHelper;

    public ObserveAsyncTests(ITestOutputHelper testOutputHelper)
    {
        _testOutputHelper = testOutputHelper;
    }

    [Fact]
    public async Task Observe_Async()
    {
        // Arrange
        var values = new[] { "1", "2", "3" };
        var subject = new Subject<string>();
        
        // TimeSpan ts = TimeSpan.FromTicks(1696592212);
        
        // Act
        Task.Delay(3000).ContinueWith(_ =>
        {
            foreach (var value in values)
            {  
                subject.OnNext(value);
            }
            
            // subject.OnError(new Exception("Testing"));
            subject.OnCompleted();
        });
        
        await subject.ObserveAsync(async s =>
            {
                _testOutputHelper.WriteLine(s);
                await Task.Delay(500);
            },
            async ex =>
            {
                _testOutputHelper.WriteLine($"Error {ex}");
                await Task.Delay(500);
            },
            async () =>
            {
                _testOutputHelper.WriteLine("Completed");
                // return Task.CompletedTask;
                await Task.Delay(500);
            },
            CancellationToken.None);
        
        // Assert
        Assert.True(true);
    }
}

Upvotes: 0

Related Questions