Reputation: 29981
I'm trying to integrate some TPL async into a larger Rx chain using Observable.FromAsync
, like in this small example:
using System;
using System.Reactive.Linq;
using System.Threading.Tasks;
namespace rxtest
{
class Program
{
static void Main(string[] args)
{
MainAsync().Wait();
}
static async Task MainAsync()
{
await Observable.Generate(new Random(), x => true,
x => x, x => x.Next(250, 500))
.SelectMany((x, idx) => Observable.FromAsync(async ct =>
{
Console.WriteLine("start: " + idx.ToString());
await Task.Delay(x, ct);
Console.WriteLine("finish: " + idx.ToString());
return idx;
}))
.Take(10)
.LastOrDefaultAsync();
}
}
}
However, I've noticed that this seems to start all the async tasks concurrently rather than doing them one at a time, which causes memory usage of the app to balloon. The SelectMany
appears to be acting no different than a Merge
.
Here, I see output like this:
start: 0
start: 1
start: 2
...
I'd like to see:
start: 0
finish: 0
start: 1
finish: 1
start: 2
finish: 2
...
How can I achieve this?
Upvotes: 5
Views: 1301
Reputation: 29776
Change the SelectMany
to a Select
with a Concat
:
static async Task MainAsync()
{
await Observable.Generate(new Random(), x => true,
x => x, x => x.Next(250, 500))
.Take(10)
.Select((x, idx) => Observable.FromAsync(async ct =>
{
Console.WriteLine("start: " + idx.ToString());
await Task.Delay(x, ct);
Console.WriteLine("finish: " + idx.ToString());
return idx;
}))
.Concat()
.LastOrDefaultAsync();
}
EDIT - I moved the Take(10) up the chain because the Generate won't block - so it stops this running away.
The Select
projects each event into a stream representing an async task that will start on Subscription. Concat
accepts a stream of streams and subscribes to each successive sub-stream when the previous has completed, concatenating all the streams into a single flat stream.
Upvotes: 10