Dan
Dan

Reputation: 5972

Using Rx to limit to X concurrent Tasks

I'm very new to Rx, and I'm trying to use it to have up to X number of concurrent subscribing tasks. The data source is actually from a database, so I'm having to poll the database. I realise that the idea behind Rx is that it's push and not pull - so the poll doesn't really fit that well, but conceptually the data going into the database is a stream of events that I want to subscribe to and do something with.

The main issue that I have is that the LimitedConcurrencyLevelTaskScheduler doesn't seem to be successfully limiting to the specified number of tasks. It runs a lot more concurrently than the 8 I've specified.

I'm also unsure which is the better way out of the two below solutions (or possibly, both are wrong?!).

Here's one method I've tried which uses the Observable.Timer ...

public static void Main()
{
    var taskFactory = new TaskFactory (new LimitedConcurrencyLevelTaskScheduler (8));
    var scheduler = new TaskPoolScheduler (taskFactory);

    Observable.Timer (TimeSpan.FromMilliseconds (10), scheduler)
        .SelectMany (x => Observable.FromAsync (GetItemsSource))
        .Repeat ()
        .ObserveOn (scheduler)
        .Subscribe (x => Observable.FromAsync(y => DoSomethingAsync (x.ToList())));

    Console.ReadKey ();
}

private static async Task<IEnumerable<Guid>> GetItemsSource()
{
    return await _myRepo.GetMoreAsync(10);
}

private static async Task DoSomethingAsync(IEnumerable<Guid> items)
{
    // Do something with the data
}

I've also tried doing it this way instead ...

public static void Main()
{
    GetItemsSource()
        .ObserveOn(scheduler)
        .Select (async x => await DoSomethingAsync(x))
        .Subscribe();

    Console.ReadKey ();
}

public static IObservable<Guid> GetItemsSource()
{
    return Observable.Create<Guid>(
        async obs =>
        {
            while (true)
            {
                var item = (await _myRepo.GetMoreAsync(1)).FirstOrDefault();

                if(item != null)
                {
                    obs.OnNext(item);
                }

                await Task.Delay(TimeSpan.FromMilliseconds(10))
            }
        });
}

private static async Task DoSomethingAsync(IEnumerable<Guid> items)
{
    // Do something with the data
}

Obviously very simple examples, without error handling or cancellation support.

Both seem to work, but neither limit to 8 concurrent tasks.

As I say, I'm very new to Rx, and am probably missing lots of fundamental things. I certainly plan on doing a lot of reading to fully understand Rx as it seems very powerful, but for now I want to get something working quickly.

UPDATE

Following on from Enigmativity's answer and comments, here's some code which logs out the concurrent counts ...

void Main()
{
    var taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(8));
    var scheduler = new TaskPoolScheduler(taskFactory);

    using (
        (
            from n in Observable.Interval(TimeSpan.FromMilliseconds(10), scheduler)
            from g in Observable.FromAsync(GetItemsSource, scheduler)
            from u in Observable.FromAsync(() => DoSomethingAsync(g), scheduler)
            select u)
        .ObserveOn(scheduler)
        .Subscribe())
    {
        Console.ReadLine();
    }
}

private static volatile int _numIn = 0;
private static volatile int _numOut = 0;

public static async Task<IEnumerable<Guid>> GetItemsSource()
{
    try
    {
        _numIn++;

        $"Concurrent tasks (in): {_numIn}".Dump();

        // Simulate async API call
        await Task.Delay(TimeSpan.FromMilliseconds(10));

        return new List<Guid> { Guid.NewGuid() };
    }
    finally
    {
        _numIn--;
    }
}

private static async Task DoSomethingAsync(IEnumerable<Guid> deliveryIds)
{
    try
    {
        _numOut++;

        // Simulate async calls required to process the event
        await Task.Delay(TimeSpan.FromMilliseconds(1000));

        $"Concurrent tasks (out): {_numOut}".Dump();
    }
    finally
    {
        _numOut--;
    }
}

This shows around 64 concurrent tasks running.

Update 2

It does look like this is because the subscriber is async. If I test using a non-async subscriber, it works fine. Unfortunately I need to have an async subscriber, as it needs to call other async methods.

It looks like I can do something similar by doing this ...

GetItemsSource2()
    .Select(x => Observable.FromAsync(() => DoSomethingAsync(x)))
    .Merge(64)
    .Subscribe();

So using Merge instead of the LimitedConcurrencyLevelTaskScheduler.

Upvotes: 0

Views: 1118

Answers (1)

Enigmativity
Enigmativity

Reputation: 117064

I got your code to work by doing this:

void Main()
{
    var taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(8));
    var scheduler = new TaskPoolScheduler(taskFactory);

    using (Observable.Timer(TimeSpan.FromMilliseconds(10), scheduler)
        .SelectMany(x => Observable.FromAsync(GetItemsSource))
        .Repeat()
        .ObserveOn(scheduler)
        .Subscribe(async x => await DoSomethingAsync(x.ToList())))
    {
        Console.ReadLine();
    };
}

private static async Task<IEnumerable<Guid>> GetItemsSource()
{
    return await Task.Run(() => Enumerable.Range(0, 10).Select(x => Guid.NewGuid()).ToArray());
}

private static async Task DoSomethingAsync(IEnumerable<Guid> items)
{
    await Task.Run(() => Console.WriteLine(String.Join("|", items.Select(x => x.ToString()))));
}

I also modified the QueueTask method of LimitedConcurrencyLevelTaskScheduler to include a trace line for the numbers of running delegates:

protected sealed override void QueueTask(Task task)
{
    // Add the task to the list of tasks to be processed.  If there aren't enough 
    // delegates currently queued or running to process tasks, schedule another. 
    lock (_tasks)
    {
        Console.WriteLine(_delegatesQueuedOrRunning);
        _tasks.AddLast(task);
        if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
        {
            ++_delegatesQueuedOrRunning;
            NotifyThreadPoolOfPendingWork();
        }
    }
}

When I ran your code I got this output:

0
1
1
1
874695ca-e9a8-4688-a4d2-d7d2446e7924|e3cbadf1-c5cb-4339-ab59-33d873db98f2|8dd710f5-0b21-4b20-a547-6ccfe7c29af4|42739c5a-4602-4f76-8aed-40f1c0ffccdc|c599e879-06ca-459f-b27e-95ac15dc4cc1|562b5f7c-dcb2-47d8-aa4a-503499654139|b78e5fc5-d152-4380-9799-2713c6f71c19|e47669a3-a399-4891-91b6-3a28b52a941a|f6483f2f-f8d7-47e8-9f88-bf9bc5f61f3a|c8e75203-bc55-4e00-9f8b-ecf248f81454
2
2
1
2
3
92d6f76e-6a3d-475d-8c1c-bd3e59aaf2a1|1b2bd0d6-c439-4b3c-a1f1-9af1f5132afc|c140e0ec-6741-4310-9edf-547fdf390b01|5b29dcd3-21c9-46a2-ae98-cd7a7b1a003c|5a808def-a09f-41a7-acfc-d11cfbbb4faa|28f47d0c-7762-4949-ae33-427c82756874|13087b1c-c4eb-4f0f-bf5f-665a2298ac13|50c00907-668d-44f3-9e2a-c790348fb715|34f8602a-18ef-45b6-b069-0fb30718a45b|46d6616d-0e89-49cd-8905-dff72bb63add
2
1
1
2
7c6cbe5a-43d1-4aad-8eee-33a0d7f44276|2164e3e0-4e8d-4a57-99b7-0aa5e42281da|e2cf26ed-501f-4032-9761-53a301e16bb9|a3e9171a-f490-4135-a930-017dc706293e|b9f43f5c-d652-4205-b857-724699f178c5|5d8e6149-f9ad-424d-9ee2-07afa344ab80|418d2526-adf6-4c26-ac84-636400fce547|3b8c3b14-9e91-4bb8-9cfa-c1d36f12ccc0|be3c8c84-5112-4c85-ba7f-d1b41a2ec03a|bdc34ccb-a3d9-45fa-bf54-d42bcd791081
2
1
1
2
3d5e0d0a-ddb3-4595-b960-4d2050d4fa1a|1b5a39c9-652c-4872-8d1c-6e1212cd4043|fd9aff7b-9c77-47e4-a4c6-2ede38472b92|ff6145d3-2dc2-45b6-bc40-2cc1f270572d|5ba0e441-a9f1-4b2a-baca-127cce622993|e2650bb9-f2e4-4a89-8c2c-69859f5f381a|a86a1f4e-7ea7-465c-9730-7ac735c5616e|5cf7135b-fafe-4725-938e-5e7ea55f4c3e|dd26bda7-d86b-4bb9-977f-a29ef5cf6d76|6e77c1c1-4e8d-4b48-b6f4-d54f0ec9d269
1
1

...etc...

Your code never hits more than 3 concurrent tasks.

Now I'd suggest that you try to write your code in a more Rx-ish kind of way. Try this:

using (
(
    from n in Observable.Interval(TimeSpan.FromSeconds(1.0), scheduler)
    from g in Observable.FromAsync(() => GetItemsSource(), scheduler)
    from u in Observable.FromAsync(() => DoSomethingAsync(g), scheduler)
    select u)
    .ObserveOn(scheduler)
    .Subscribe())
{
    Console.ReadLine();
};

It doesn't change the number of tasks used, but it's cleaner.

Upvotes: 1

Related Questions