Enrico Massone
Enrico Massone

Reputation: 7338

How to pass an async function to ConcurrentDictionary.GetOrAdd method?

This is a question related with both caching and asynchronous functions. In order to provide some context for my question, I'll explain a bit why I'm facing this problem with caching and async functions.

These days I'm refactoring an old piece of code meant to provide some user information given the user id. To fix the idea, imagine a signature like this:

public interface IUserInfoRetriever 
{
  UserInfo GetUserInfo(Guid userId);
}

public class UserInfoRetriever : IUserInfoRetriever 
{
   // implementation is discussed below
}

The class UserInfoRetriever has an instance of ConcurrentDictionary<Guid, UserInfo> which is used as a cache (where the user id is the key).

This in-memory cache is pre filled the first time that the method GetUserInfo is called. The current way employed to do so requires to take a lock each time GetUserInfo is called, in order to check whether the cache initialization has already been done.

After the lock has been released the method GetOrAdd of the ConcurrentDictionary is called in order to get the requested user info. The second parameter passed to GetOrAdd (the value factory used in the case of a cache miss) is a lambda expression which calls a remote web service able to provide user information. This call is currently done in a blocking fashion (using Task<T>.Result) because ConcurrentDictionary does not support async calls.

Here is some metacode to better understand the scenario:

public class UserInfoRetriever : IUserInfoRetriever 
{
    private readonly ConcurrentDictionary<Guid, UserInfo> userCache = new ConcurrentDictionary<Guid, UserInfo>();
    private bool isCacheInitialized = false;
    private readonly object locker = new object();

    public UserInfo GetUserInfo(Guid userId) 
    {
        this.InitializeCache();

        return this.cache.GetOrAdd(userId, () => this.GetUserFromWebService(userId));
    }

    private void InitializeCache()
    {
        lock(this.locker)
        {
            if (this.isCacheInitialized)
            {
                return;
            }

            // read all the users available in the Users table of a database
            // put each user in the cache by using ConcurrentDictionary<Guid, UserInfo>.TryAdd()

            this.isCacheInitialized = true;
        }
    }

    private UserInfo GetUserFromWebService(Guid userId) 
    {
        // performs a call to a web service in a blocking fashion instead of using async methods of HttpClient class
        // this is due to the signature of ConcurrentDictionary<TKey,TValue>.GetOrAdd not supporting async functions as the second parameter
    }
}

This is clearly a mess. This class is doing too much and the method InitializeCache can be replaced by using the framework class Lazy<T> in order to get rid of both the lock and the flag isCacheInitialized.

Now it's time to close the introductory part and pass to the real question. The hard bit in this scenario is the ConcurrentDictionary and its lack of support for async functions in the method GetOrAdd. This is the whole point of my question.

Based on my knowledge there are four possible approaches:

What approach do you suggest for the ConcurrentDictionary ? Are there other ways other than the ones I listed above to work with the ConcurrentDictionary when the factory method used to provide a missing item is async ?

My question is only related to the problem of managing the lack of support for async functions of ConcurrentDictionary. I'm looking for suggestion about that because there are several possible approaches to solve it. I explained the whole refactoring scenario at the sole purpose of being clearer and provide some context for my question.

UPDATE 24th JULY 2019

For the ones interested, here is my final implementation:

public sealed class InMemoryUserCache : IUserCache, IDisposable
  {
    private readonly IBackEndUsersRepository _userRepository;
    private readonly Lazy<ConcurrentDictionary<Guid, Task<BackEndUserInfo>>> _cache;
    private readonly SemaphoreSlim _initializeCacheLocker;

    public InMemoryUserCache(IBackEndUsersRepository userRepository)
    {
      _userRepository = userRepository ?? throw new ArgumentNullException(nameof(userRepository));
      _cache = new Lazy<ConcurrentDictionary<Guid, Task<BackEndUserInfo>>>(
        InitializeCache,
        LazyThreadSafetyMode.PublicationOnly);
      _initializeCacheLocker = new SemaphoreSlim(2); // allows concurrency, but limit to 2 the number of threads that can concurrently initialize the lazy instance
    }

    public Task<BackEndUserInfo> GetOrAdd(
      Guid userId,
      Func<Guid, Task<BackEndUserInfo>> userInfoFactory)
    {
      if (userInfoFactory == null)
        throw new ArgumentNullException(nameof(userInfoFactory));

      return _cache.Value.GetOrAdd(userId, ToSafeUserInfoFactory(userInfoFactory));
    }

    private ConcurrentDictionary<Guid, Task<BackEndUserInfo>> InitializeCache()
    {
      _initializeCacheLocker.Wait();

      try
      {
        var cache = new ConcurrentDictionary<Guid, Task<BackEndUserInfo>>();

        foreach (var user in _userRepository.FindAll())
        {
          cache[user.Id] = Task.FromResult(user);
        }

        return cache;
      }
      finally
      {
        _initializeCacheLocker.Release();
      }
    }

    private Func<Guid, Task<BackEndUserInfo>> ToSafeUserInfoFactory(
      Func<Guid, Task<BackEndUserInfo>> userInfoFactory) =>
        userId => TryExecuteUserInfoFactory(userInfoFactory, userId);

    private async Task<BackEndUserInfo> TryExecuteUserInfoFactory(
      Func<Guid, Task<BackEndUserInfo>> userInfoFactory,
      Guid userId)
    {
      try
      {
        return await userInfoFactory(userId).ConfigureAwait(false);
      }
      catch (Exception)
      {
        _ = _cache.Value.TryRemove(userId, out var _);
        throw;
      }
    }

    public void Dispose()
    {
      _initializeCacheLocker?.Dispose();
    }
  }

Upvotes: 0

Views: 1730

Answers (1)

Ben Voigt
Ben Voigt

Reputation: 283684

It seems like a bad idea to support async callbacks in GetOrAdd. Obviously, the callback is doing a non-trivial amount of work or state change, or you'd have used the version where you prepare the new item ahead of time... but still, this is a synchronized access to a concurrent collection we're talking about.

What should happen if the same element of the ConcurrentDictionary is accessed before the callback completes? With threading, you could have at most one operation in progress on the ConcurrentDictionary per thread, unless you did something really crazy with re-entrancy. With Tasks and continuation-passing-style, you could end up with unbounded overlapping Dictionary requests, even unbounded recursion, without even trying.

Consider what should happen if a second cache lookup for the same user is made before the first one completes. In your case, both lookups should probably return the data from the one network request. To make that happen, change your ConcurrentDictionary<UserId, User> to a ConcurrentDictionary<UserId, Task<User>> and go ahead and pass your async function as the callback. Its Task will get registered within the Dictionary when the async code blocks, instead of when it completes.... and this allows subsequent lookups to await the same already-in-flight Task. This is your second bullet, but I'm not sure why you reached the conclusion that it handles multiple lookups badly. It won't 100% prevent a duplicate lookup from resulting in independent network requests, but the race window is much smaller than it is for your current implementation.

Library behaviors should be designed in order to make correct operations easy, and straightforward operation correct (sometimes called "The Pit of Success"). GetOrAddAsync would make reasoning about the logic considerably more complex, which flies in the face of providing a general-purpose prebuilt thread-safe collection class.

If you have requirements for long-running element initialization on your shared collection, the general-purpose one isn't a good choice, and you'll want to build a thread-safe collection that is optimized for your particular use case.

Upvotes: 2

Related Questions