nothrow
nothrow

Reputation: 16168

How to Subscribe with async method in Rx?

I have following code:

IObservable<Data> _source;

...

_source.Subscribe(StoreToDatabase);

private async Task StoreToDatabase(Data data) {
    await dbstuff(data);
}

However, this does not compile. Is there any way how to observe data asynchronously? I tried async void, it works, but I feel that given solution is not feasible.

I also checked Reactive Extensions Subscribe calling await, but it does not answer my question (I do not care about the SelectMany result.)

Upvotes: 19

Views: 12505

Answers (4)

Andyz Smith
Andyz Smith

Reputation: 708

So you want to run the Store Data Procedure, possibly some other procedure and asynchronously await the completion or partial result. How about Create constructor shown here:

IObservable<Int32> NotifyOfStoringProgress = 
Observable.Create(
(Func<IObserver<Int32>, Task>)
(async (ObserverToFeed) =>
{
    ObserverToFeed.OnNext(-1);    
    Task StoreToDataBase = Task.Run(()=> {;});
    ObserverToFeed.OnNext(0);    
    ;;
    await StoreToDataBase;
    ObserverToFeed.OnNext(1);
    ;;
}));

NotifyOfStoringProgress.Subscribe(onNext: Notification => {;});

Upvotes: 0

Benjol
Benjol

Reputation: 66531

Late answer, but I think that the following extension methods correctly encapsulate what Charles Mager proposed in his answer:

public static IDisposable SubscribeAsync<T>(this IObservable<T> source, 
                         Func<Task> asyncAction, Action<Exception> handler = null)
{
    Func<T,Task<Unit>> wrapped = async t =>
    {
        await asyncAction();
        return Unit.Default;
    };
    if(handler == null)
        return source.SelectMany(wrapped).Subscribe(_ => { });
    else
        return source.SelectMany(wrapped).Subscribe(_ => { }, handler);
}

public static IDisposable SubscribeAsync<T>(this IObservable<T> source, 
                         Func<T,Task> asyncAction, Action<Exception> handler = null)
{
    Func<T, Task<Unit>> wrapped = async t =>
    {
        await asyncAction(t);
        return Unit.Default;
    };
    if(handler == null)
        return source.SelectMany(wrapped).Subscribe(_ => { });
    else
        return source.SelectMany(wrapped).Subscribe(_ => { }, handler);
}

Upvotes: 13

Gusdor
Gusdor

Reputation: 14334

I've been using TPL DataFlow to control back pressure and have used it to solve this problem.

The key part is ITargetBlock<TInput>.AsObserver() - source.

// Set a block to handle each element
ITargetBlock<long> targetBlock = new ActionBlock<long>(async p =>
{
    Console.WriteLine($"Received {p}");
    await Task.Delay(1000);
    Console.WriteLine($"Finished handling {p}");
},
new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

// Generate an item each second for 10 seconds
var sequence = Observable.Interval(TimeSpan.FromSeconds(1)).Take(10);

// Subscribe with an observer created from the target block.
sequence.Subscribe(targetBlock.AsObserver());

// Await completion of the block
await targetBlock.Completion;

The important part here is that the ActionBlock's bounded capacity is set to 1. This prevents the block from receiving more than one item at a time and will block OnNext if an item is already being processed!

My big surprise here was that it can be safe to call Task.Wait and Task.Result inside your subscription. Obviously, if you have called ObserverOnDispatcher() or similar you will probably hit deadlocks. Be careful!

Upvotes: 1

Charles Mager
Charles Mager

Reputation: 26213

You don't have to care about the SelectMany result. The answer is still the same... though you need your task to have a return type (i.e. Task<T>, not Task).

Unit is essentially equivalent to void, so you can use that:

_source.SelectMany(StoreToDatabase).Subscribe();

private async Task<Unit> StoreToDatabase(Data data)
{
    await dbstuff(data);
    return Unit.Default;
}

This SelectMany overload accepts a Func<TSource, Task<TResult> meaning the resulting sequence will not complete until the task is completed.

Upvotes: 22

Related Questions