Reputation: 32104
I have an IObservable
that generates a value every second, followed by a select that runs code that may take some time:
var events = Observable.Interval(TimeSpan.FromSeconds(1));
ssoInfoObservable = events
.Select(async e =>
{
Console.Out.WriteLine("Select : " + e);
await Task.Delay(4000);
return e;
})
.SelectMany(t => t.ToObservable())
.Subscribe(l => Console.WriteLine("Subscribe: " + l));
The long-running operation takes 4 seconds in my example. While the code inside Select
is running, I do not want another value from the Interval
to be generated. How do I accomplish this? Is this possible? Maybe use a specific IScheduler
implementation?
Note that if there's no async code, everything works as expected as described here.
This question is very similar to one I asked earlier, except for the async/await
.
Upvotes: 0
Views: 279
Reputation: 18663
See this sample on creating an async generate function. Your would be slightly different in that you need a time offset and you don't need the iterate
so it would look more like this:
public static IObservable<T> GenerateAsync<T>(TimeSpan span,
Func<int, Task<T>> generator,
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<T>(obs =>
{
return scheduler.Schedule(0, span, async (idx, recurse) =>
{
obs.OnNext(await generator(idx));
recurse(idx + 1, span);
});
});
}
Usage:
Extensions.GenerateAsync(TimeSpan.FromSeconds(1), idx => /*Async work, return a task*/, scheduler);
As a possible second option you could look to port the implementation of switchFirst
into C#. SwitchFirst
will subscribe to the first Observable it receives and ignore subsequent ones until its current subscription completes.
If you took that approach you could have something like:
Observable.Interval(TimeSpan.FromSeconds(1))
.Select(e => Observable.FromAsync(() => /*Do Async stuff*/)
.SwitchFirst()
.Subscribe();
Upvotes: 1