Reputation: 16168
I have following code:
IObservable<Data> _source;
...
_source.Subscribe(StoreToDatabase);
private async Task StoreToDatabase(Data data) {
await dbstuff(data);
}
However, this does not compile. Is there any way how to observe data asynchronously? I tried async void
, it works, but I feel that given solution is not feasible.
I also checked Reactive Extensions Subscribe calling await, but it does not answer my question (I do not care about the SelectMany
result.)
Upvotes: 19
Views: 12505
Reputation: 708
So you want to run the Store Data Procedure, possibly some other procedure and asynchronously await the completion or partial result. How about Create constructor shown here:
IObservable<Int32> NotifyOfStoringProgress =
Observable.Create(
(Func<IObserver<Int32>, Task>)
(async (ObserverToFeed) =>
{
ObserverToFeed.OnNext(-1);
Task StoreToDataBase = Task.Run(()=> {;});
ObserverToFeed.OnNext(0);
;;
await StoreToDataBase;
ObserverToFeed.OnNext(1);
;;
}));
NotifyOfStoringProgress.Subscribe(onNext: Notification => {;});
Upvotes: 0
Reputation: 66531
Late answer, but I think that the following extension methods correctly encapsulate what Charles Mager proposed in his answer:
public static IDisposable SubscribeAsync<T>(this IObservable<T> source,
Func<Task> asyncAction, Action<Exception> handler = null)
{
Func<T,Task<Unit>> wrapped = async t =>
{
await asyncAction();
return Unit.Default;
};
if(handler == null)
return source.SelectMany(wrapped).Subscribe(_ => { });
else
return source.SelectMany(wrapped).Subscribe(_ => { }, handler);
}
public static IDisposable SubscribeAsync<T>(this IObservable<T> source,
Func<T,Task> asyncAction, Action<Exception> handler = null)
{
Func<T, Task<Unit>> wrapped = async t =>
{
await asyncAction(t);
return Unit.Default;
};
if(handler == null)
return source.SelectMany(wrapped).Subscribe(_ => { });
else
return source.SelectMany(wrapped).Subscribe(_ => { }, handler);
}
Upvotes: 13
Reputation: 14334
I've been using TPL DataFlow to control back pressure and have used it to solve this problem.
The key part is ITargetBlock<TInput>.AsObserver()
- source.
// Set a block to handle each element
ITargetBlock<long> targetBlock = new ActionBlock<long>(async p =>
{
Console.WriteLine($"Received {p}");
await Task.Delay(1000);
Console.WriteLine($"Finished handling {p}");
},
new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
// Generate an item each second for 10 seconds
var sequence = Observable.Interval(TimeSpan.FromSeconds(1)).Take(10);
// Subscribe with an observer created from the target block.
sequence.Subscribe(targetBlock.AsObserver());
// Await completion of the block
await targetBlock.Completion;
The important part here is that the ActionBlock's bounded capacity is set to 1. This prevents the block from receiving more than one item at a time and will block OnNext if an item is already being processed!
My big surprise here was that it can be safe to call Task.Wait
and Task.Result
inside your subscription. Obviously, if you have called ObserverOnDispatcher()
or similar you will probably hit deadlocks. Be careful!
Upvotes: 1
Reputation: 26213
You don't have to care about the SelectMany
result. The answer is still the same... though you need your task to have a return type (i.e. Task<T>
, not Task
).
Unit
is essentially equivalent to void
, so you can use that:
_source.SelectMany(StoreToDatabase).Subscribe();
private async Task<Unit> StoreToDatabase(Data data)
{
await dbstuff(data);
return Unit.Default;
}
This SelectMany
overload accepts a Func<TSource, Task<TResult>
meaning the resulting sequence will not complete until the task is completed.
Upvotes: 22