Ronald Wildenberg
Ronald Wildenberg

Reputation: 32104

Throttle IObservable based on whether async handler is still busy

I have an IObservable that generates a value every second, followed by a select that runs code that may take some time:

var events = Observable.Interval(TimeSpan.FromSeconds(1));
ssoInfoObservable = events
    .Select(async e =>
    {
        Console.Out.WriteLine("Select   : " + e);
        await Task.Delay(4000);
        return e;
    })
    .SelectMany(t => t.ToObservable())
    .Subscribe(l => Console.WriteLine("Subscribe: " + l));

The long-running operation takes 4 seconds in my example. While the code inside Select is running, I do not want another value from the Interval to be generated. How do I accomplish this? Is this possible? Maybe use a specific IScheduler implementation?

Note that if there's no async code, everything works as expected as described here.

This question is very similar to one I asked earlier, except for the async/await.

Upvotes: 0

Views: 279

Answers (1)

paulpdaniels
paulpdaniels

Reputation: 18663

See this sample on creating an async generate function. Your would be slightly different in that you need a time offset and you don't need the iterate so it would look more like this:

    public static IObservable<T> GenerateAsync<T>(TimeSpan span, 
                                                  Func<int, Task<T>> generator, 
                                                  IScheduler scheduler = null)
    {
        scheduler = scheduler ?? Scheduler.Default;

        return Observable.Create<T>(obs =>
        {
            return scheduler.Schedule(0, span, async (idx, recurse) =>
            {
                obs.OnNext(await generator(idx));
                recurse(idx + 1, span);
            });

        });
    }

Usage:

  Extensions.GenerateAsync(TimeSpan.FromSeconds(1), idx => /*Async work, return a task*/, scheduler);

As a possible second option you could look to port the implementation of switchFirst into C#. SwitchFirst will subscribe to the first Observable it receives and ignore subsequent ones until its current subscription completes.

If you took that approach you could have something like:

Observable.Interval(TimeSpan.FromSeconds(1))
          .Select(e => Observable.FromAsync(() => /*Do Async stuff*/)
          .SwitchFirst()
          .Subscribe();

Upvotes: 1

Related Questions