Brent
Brent

Reputation: 209

How to execute an arbitrary number of async tasks in sequential order?

I have this function:

async Task RefreshProfileInfo(List<string> listOfPlayers)

// For each player in the listOfPlayers, checks an in-memory cache if we have an entry.
// If we have a cached entry, do nothing.
// If we don't have a cached entry, fetch from backend via an API call.

This function is called very frequently, like:

await RefreshProfileInfo(playerA, playerB, playerC)

or

await RefreshProfileInfo(playerB, playerC, playerD) 

or

await RefreshProfileInfo(playerE, playerF)

Ideally, if the players do not overlap each other, the calls should not affect each other (requesting PlayerE and PlayerF should not block the request for PlayerA, PlayerB, PlayerC). However, if the players DO overlap each other, the second call should wait for the first (requesting PlayerB, PlayerC, PlayerD, should wait for PlayerA, PlayerB, PlayerC to finish).

However, if that isn't possible, at the very least I'd like all calls to be sequential. (I think they should still be async, so they don't block other unrelated parts of the code).

Currently, what happens is each RefreshProfileInfo runs in parallel, which results in hitting backend every time (9 times in this example).

Instead, I want to execute them sequentially, so that only the first call hits the backend, and subsequent calls just hit cache.

What data structure/approach should I use? I'm having trouble figuring out how to "connect" the separate calls to each other. I've been playing around with Task.WhenAll() as well as SemaphoreSlim, but I can't figure out how to use them properly.

Failed attempt

The idea behind my failed attempt was to have a helper class where I could call a function, SequentialRequest(Task), and it would sequentially run all tasks invoked in this manner.

List<Task> waitingTasks = new List<Task>();
object _lock = new object();

public async Task SequentialRequest(Task func)
{
    var waitingTasksCopy = new List<Task>();

    lock (_lock)
    {
        waitingTasksCopy = new List<Task>(waitingTasks);
        waitingTasks.Add(func); // Add this task to the waitingTasks (for future SequentialRequests)
    }

    // Wait for everything before this to finish
    if (waitingTasksCopy.Count > 0)
    {
        await Task.WhenAll(waitingTasksCopy);
    }

    // Run this task
    await func;
}

I thought this would work, but "func" is either run instantly (instead of waiting for earlier tasks to finish), or never run at all, depending on how I call it.

If I call it using this, it runs instantly:

async Task testTask()
{
    await Task.Delay(4000);
}

If I call it using this, it never runs:

Task testTask = new Task(async () =>
{
    await Task.Delay(4000);
});

Upvotes: 0

Views: 299

Answers (2)

Stephen Cleary
Stephen Cleary

Reputation: 456507

Here's why your current attempt doesn't work:

// Run this task
await func;

The comment above is not describing what the code is doing. In the asynchronous world, a Task represents some operation that is already in progress. Tasks are not "run" by using await; await it a way for the current code to "asynchronously wait" for a task to complete. So no function signature taking a Task is going to work; the task is already in progress before it's even passed to that function.

Your question is actually about caching asynchronous operations. One way to do this is to cache the Task<T> itself. Currently, your cache holds the results (T); you can change your cache to hold the asynchronous operations that retrieve those results (Task<T>). For example, if your current cache type is ConcurrentDictionary<PlayerId, Player>, you could change it to ConcurrentDictionary<PlayerId, Task<Player>>.

With a cache of tasks, when your code checks for a cache entry, it will find an existing entry if the player data is loaded or has started loading. Because the Task<T> represents some asynchronous operation that is already in progress (or has already completed).

A couple of notes for this approach:

  1. This only works for in-memory caches.
  2. Think about how you want to handle errors. A naive cache of Task<T> will also cache error results, which is usually not desired.

The second point above is the trickier part. When an error happens, you'd probably want some additional logic to remove the errored task from the cache. Bonus points (and additional complexity) if the error handling code prevents an errored task from getting into the cache in the first place.

at the very least I'd like all calls to be sequential

Well, that's much easier. SemaphoreSlim is the asynchronous replacement for lock, so you can use a shared SemaphoreSlim. Call await mySemaphoreSlim.WaitAsync(); at the beginning of RefreshProfileInfo, put the body in a try, and in the finally block at the end of RefreshProfileInfo, call mySemaphoreSlim.Release();. That will limit all calls to RefreshProfileInfo to running sequentially.

Upvotes: 4

JohanP
JohanP

Reputation: 5472

I had the same issue in one of my projects. I had multiple threads call a single method and they all made IO calls when not found in cache. What you want to do is to add the Task to your cache and then await it. Subsequent calls will then just read the result once the task completes.

Example:

private Task RefreshProfile(Player player)
{
     // cache is of type IMemoryCache
     return _cache.GetOrCreate(player, entry => 
     {
        // expire in 30 seconds
        entry.AbsoluteExpiration = DateTimeOffset.UtcNow.AddSeconds(30);
        return ActualRefreshCodeThatReturnsTask(player);
     });
}

Then just await in your calling code

await Task.WhenAll(RefreshProfile(Player a), RefreshProfile(Player b), RefreshProfile(Player c));

Upvotes: 0

Related Questions