Steztric
Steztric

Reputation: 2942

C# LanguageExt - combine multiple async calls into one grouped call

I have a method that looks up an item asynchronously from a datastore;

class MyThing {}
Task<Try<MyThing>> GetThing(int thingId) {...}

I want to look up multiple items from the datastore, and wrote a new method to do this. I also wrote a helper method that will take multiple Try<T> and combine their results into a single Try<IEnumerable<T>>.


public static class TryExtensions
{
    Try<IEnumerable<T>> Collapse<T>(this IEnumerable<Try<T>> items)
    {
        var failures = items.Fails().ToArray();
        return failures.Any() ?
            Try<IEnumerable<T>>(new AggregateException(failures)) :
            Try(items.Select(i => i.Succ(a => a).Fail(Enumerable.Empty<T>())));
    }
}

async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
    var results = new List<Try<Things>>();

    foreach (var id in ids)
    {
        var thing = await GetThing(id);
        results.Add(thing);
    }

    return results.Collapse().Map(p => p.ToArray());
}

Another way to do it would be like this;

async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
    var tasks = ids.Select(async id => await GetThing(id)).ToArray();
    await Task.WhenAll(tasks);
    return tasks.Select(t => t.Result).Collapse().Map(p => p.ToArray());
}

The problem with this is that all the tasks will run in parallel and I don't want to hammer my datastore with lots of parallel requests. What I really want is to make my code functional, using monadic principles and features of LanguageExt. Does anyone know how to achieve this?


Update

Thanks for the suggestion @MatthewWatson, this is what it looks like with the SemaphoreSlim;

async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
    var mutex = new SemaphoreSlim(1);
    var results = ids.Select(async id =>
    {
        await mutex.WaitAsync();
        try { return await GetThing(id); }
        finally { mutex.Release(); }
    }).ToArray();
    await Task.WhenAll(tasks);
    return tasks.Select(t => t.Result).Collapse().Map(Enumerable.ToArray);
    return results.Collapse().Map(p => p.ToArray());
}

Problem is, this is still not very monadic / functional, and ends up with more lines of code than my original code with a foreach block.

Upvotes: 3

Views: 1384

Answers (1)

Amen Ayach
Amen Ayach

Reputation: 4348

In the "Another way" you almost achieved your goal when you called:

var tasks = ids.Select(async id => await GetThing(id)).ToArray();

Except that Tasks doesn't run sequentially so you will end up with many queries hitting your datastore, which is caused by .ToArray() and Task.WhenAll. Once you called .ToArray() it allocated and started the Tasks already, so if you can "tolerate" one foreach to achieve the sequential tasks running, like this:

public static class TaskExtensions
{
    public static async Task RunSequentially<T>(this IEnumerable<Task<T>> tasks)
    {
        foreach (var task in tasks) await task;
    }
}

Despite that running a "loop" of queries is not a quite good practice in general, unless you have in some background service and some special scenario, leveraging this to the Database engine through WHERE thingId IN (...) in general is a better option. Even you have big amount of thingIds we can slice it into small 10s, 100s.. to narrow the WHERE IN footprint.

Back to our RunSequentially, I would like to make it more functional like this for example:

tasks.ToList().ForEach(async task => await task);

But sadly this will still run kinda "Parallel" tasks.

So the final usage should be:

async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
    var tasks = ids.Select(id => GetThing(id));// remember don't use .ToArray or ToList...
    await tasks.RunSequentially();
    return tasks.Select(t => t.Result).Collapse().Map(p => p.ToArray());
}

Another overkill functional solution is to get Lazy in a Queue recursively !!

Instead GetThing, get a Lazy one GetLazyThing that returns Lazy<Task<Try<MyThing>>> simply by wrapping GetThing:

new Lazy<Task<Try<MyThing>>>(() => GetThing(id))

Now using couple extensions/functions:

public static async Task RecRunSequentially<T>(this IEnumerable<Lazy<Task<T>>> tasks)
{
    var queue = tasks.EnqueueAll();
    await RunQueue(queue);
}

public static Queue<T> EnqueueAll<T>(this IEnumerable<T> list)
{
    var queue = new Queue<T>();
    list.ToList().ForEach(m => queue.Enqueue(m));
    return queue;
}

public static async Task RunQueue<T>(Queue<Lazy<Task<T>>> queue)
{
    if (queue.Count > 0)
    {
        var task = queue.Dequeue();
        await task.Value; // this unwraps the Lazy object content
        await RunQueue(queue);
    }
}

Finally:

var lazyTasks = ids.Select(id => GetLazyThing(id));
await lazyTasks.RecRunSequentially();
// Now collapse and map as you like

Update

However if you don't like the fact that EnqueueAll and RunQueue are not "pure", we can take the following approach with the same Lazy trick

public static async Task AwaitSequentially<T>(this Lazy<Task<T>>[] array, int index = 0)
{
    if (array == null || index < 0 || index >= array.Length - 1) return;
    await array[index].Value;
    await AwaitSequentially(array, index + 1); // ++index is not pure :)
}

Now:

var lazyTasks = ids.Select(id => GetLazyThing(id));
await tasks.ToArray().AwaitSequentially();
// Now collapse and map as you like

Upvotes: 2

Related Questions