dragons
dragons

Reputation: 619

How to execute multiple async calls in parallel efficiently in C#?

I have a list of client id and for each client id I need to get data from cassandra. So I am executing all those client id's in parallel instead of using IN clause query which is not good for performance.

So I came up with below code which execute multiple async calls for each client id and it does the job by getting data out of cassandra but is it the right way to execute multiple async calls in parallel or am I doing something wrong here which can affect my performance?

public async Task<IList<Item>> GetAsync(IList<int> clientIds, int processId, int proc, Kyte kt)
{
    var clientMaps = await ProcessCassQueries(clientIds, (ct, batch) => mapper.SingleOrDefaultAsync<ItemMapPoco>(itemMapStmt, batch), "GetPIMValue");

    if (clientMaps == null || clientMaps.Count <= 0)
    {
        return null;
    }
    // .. do other stuff and return
}

// this executes multiple client ids in parallel - but is it the right way considering performance?
private async Task<List<T>> ProcessCassQueries<T>(IList<int> ids, Func<CancellationToken, int, Task<T>> mapperFunc, string msg) where T : class
{
    var requestTasks = ids.Select(id => ProcessCassQuery(ct => mapperFunc(ct, id), msg));
    return (await Task.WhenAll(requestTasks)).Where(e => e != null).ToList();
}

// this might not be good
private Task<T> ProcessCassQuery<T>(Func<CancellationToken, Task<T>> requestExecuter, string msg) where T : class
{
    return requestExecuter(CancellationToken.None);
}

I recently started using C# so have limited knowledge around that so maybe my code might be not good in terms of performance. Specially ProcessCassQueries and ProcessCassQuery methods. Anything that can be improved here or can be written in a better way considering it's a prod code?

Update:

Basis on suggestion, using semaphore to limit number of async calls as shown below:

private var semaphore = new SemaphoreSlim(20);

private async Task<List<T>> ProcessCassQueries<T>(IList<int> ids, Func<CancellationToken, int, Task<T>> mapperFunc, string msg) where T : class
{
    var tasks = ids.Select(async id => 
    {
        await semaphore.WaitAsync();
        try
        {
            return await ProcessCassQuery(ct => mapperFunc(ct, id), msg);
        }
        finally
        {
            semaphore.Release();
        }
    });

  return (await Task.WhenAll(tasks)).Where(e => e != null).ToList();
}

Upvotes: 1

Views: 661

Answers (1)

Theodor Zoulias
Theodor Zoulias

Reputation: 43515

What you are doing is correct. You are launching a bunch of tasks all at once, and then await all of them to complete. There is no inefficiency or bottleneck regarding this specific C# code. It is a bit strange that you pass a hardcoded CancellationToken.None in the ProcessCassQuery, but it will not affect the performance. The performance of the whole operation now depends on the behavior of the Cassandra database, when it is bombarded with multiple simultaneous requests. If it is optimized for this kind of usage then everything will be OK. If not, then your current setup doesn't offer the flexibility of configuring the level of concurrency to a value optimal for the specific database engine. For ways to limit the amount of concurrent async I/O operations look here.

As a side note, according to the official guidelines the asynchronous methods ProcessCassQueries and ProcessCassQuery should have the Async suffix.

Upvotes: 2

Related Questions