MajkeloDev
MajkeloDev

Reputation: 1661

C# Concurent dictionary - lock on value

I'm working on a service which is responsible for logging requests sent to our service. The service is working offline ( is being fired and forget ). We are saving the requests to different databases based on some input parameter(product id). We don't want to save to the database every time someone do a request - we would rather like to build some "batch" to be inserted and execute InsertMany every N amount of time ( let's say 10 seconds ). I've started implementing that and now I'm struggling about 2 things:

  1. Do i need to use ConcurrentDictionary ? It seems like i would achieve the same with normal Dictionary
  2. If answer to above is "no, in your case there are no benefits from ConcurrentDictionary" - is there a way to re-write my code to "properly" use ConcurrentDictionary so i can avoid using lock and ensure that AddOrUpdate won't have "collisions" with clearing the batch ?

Let me paste the snippet and explain further:

    // dictionary where key is ProductId and value is a list of items to insert to that product database
    ConcurrentDictionary<string, List<QuoteDetails>> _productDetails;
    public SaverService(StatelessServiceContext context)
        : base(context)
    {
        _productDetails = new ConcurrentDictionary<string, List<QuoteDetails>>();
    }

    // this function will be fired and forgotten by the external service
    public async Task SaveRecentRequest(RequestOptions requestData, Response responseData)
    {
        await Task.Run(() => {
            foreach (var token in requestData.ProductAccessTokens)
            {
                // this function will extract the specific product request ( one request can contain multiple products )
                var details = SplitQuoteByProduct(requestData, responseData, token);
                _productDetails.AddOrUpdate(token, new List<QuoteDetails>() { details }, (productId, list) =>
                {
                    list.Add(details);
                    return list;
                });
            }
        });
    }

    // this function will be executed by a timer every N amount of time
    public void SaveRequestsToDatabase()
    {
        lock (_productDetails)
        {
            foreach (var item in _productDetails)
            {
                // copy curent items and start a task which will process them
                SaveProductRequests(item.Key, item.Value.ToList());
                // clear curent items
                item.Value.Clear();
            }
        }
    }

    public async Task SaveProductRequests(string productId, List<QuoteDetails> productRequests)
    {
        // save received items to database
        /// ...
    }

My main concern is that without lock following scenario occurs:

  1. SaveRequestsToDatabase is fired - and is starting to process the data
  2. Just before calling item.Value.Clear(); in SaveRequestsToDatabase function, the external service fires another SaveRecentRequest function which executes AddOrUpdate with the same key - which will add request to the collection
  3. SaveRequestsToDatabase is finishing and therefore clearing the collection - but initially object added by 2. was not in the collection so was not processed

Upvotes: 2

Views: 1344

Answers (3)

Kevin Gosse
Kevin Gosse

Reputation: 39027

Often, concurrency issues come from not picking the right data structures in first place.

In your case, you have two workflows:

  • n producers, enqueuing events concurrently and continuously
  • 1 consumer, dequeuing and processing the events at given times

Your issue is that you're trying to categorize the events right off the bat, even though it's not needed. Keep the events as a simple stream in the concurrent part, and sort them only in the consumer part since you have no concurrency there.

ConcurrentQueue<(string token, QuoteDetails details)> _productDetails;

public SaverService(StatelessServiceContext context)
    : base(context)
{
    _productDetails = new ConcurrentQueue<(string, QuoteDetails)>();
}

// this function will be fired and forgotten by the external service
public async Task SaveRecentRequest(RequestOptions requestData, Response responseData)
{
    await Task.Run(() => {
        foreach (var token in requestData.ProductAccessTokens)
        {
            // this function will extract the specific product request ( one request can contain multiple products )
            var details = SplitQuoteByProduct(requestData, responseData, token);
            _productDetails.Enqueue((token, details));
        }
    });
}

// this function will be executed by a timer every N amount of time
public void SaveRequestsToDatabase()
{
    var products = new List<(string token, QuoteDetails details)>();

    while (_productDetails.TryDequeue(out var item))
    {
        products.Add(item);
    }

    foreach (var group in products.GroupBy(i => i.token, i => i.Details))
    {
        SaveProductRequests(group.Key, group);
    }
}

public async Task SaveProductRequests(string productId, IEnumerable<QuoteDetails> productRequests)
{
    // save received items to database
    /// ...
}

Upvotes: 3

DeadlyEmbrace
DeadlyEmbrace

Reputation: 450

You need to lock the dictionary whenever you add/remove/read to or from it. Your current code will allow the SaveRecentRequest to add items to the dictionary even while you are busy processing items from it. I suggest the following approach

// dictionary where key is ProductId and value is a list of items to insert to that product database
Dictionary<string, List<QuoteDetails>> _productDetails;
public SaverService(StatelessServiceContext context)
    : base(context)
{
    _productDetails = new Dictionary<string, List<QuoteDetails>>();
}

// this function will be fired and forgotten by the external service
public async Task SaveRecentRequest(RequestOptions requestData, Response responseData)
{
    await Task.Run(() => {
        foreach (var token in requestData.ProductAccessTokens)
        {
            // this function will extract the specific product request ( one request can contain multiple products )
            var details = SplitQuoteByProduct(requestData, responseData, token);
            lock(_padlock)
            {
                _productDetails.AddOrUpdate(token, new List<QuoteDetails>() { details }, (productId, list) =>
                {
                    list.Add(details);
                    return list;
                });
            }
        }
    });
}

// this function will be executed by a timer every N amount of time
public void SaveRequestsToDatabase()
{
    Dictionary<string, List<QuoteDetails>> offboardingDictionary;
    lock (_padlock)
    {
        offboardingDictionary = _productDetails;
        _productDetails = new  Dictionary<string, List<QuoteDetails>>();
    }

    foreach (var item in offboardingDictionary)
    {

        // copy curent items and start a task which will process them
        SaveProductRequests(item.Key, item.Value.ToList());
        // clear curent items
        item.Value.Clear();
    }
}

public async Task SaveProductRequests(string productId, List<QuoteDetails> productRequests)
{
    // save received items to database
    /// ...
}

private readonly object _padlock = new object();

With this you lock when you add items to the dictionary. To improve the saving performance we add a new reference to our dictionary and then replace original one with a new instance. In this way we minimize the time in the lock so new items coming in can be held in the new dictionary while our saving thread offloads items to the database from the previous dictionary.

I don't think you need a concurrent dictionary for this task, a regular dictionary will do as long as you lock your accessess

Upvotes: 1

mjwills
mjwills

Reputation: 23983

ConcurrentDictionary<string, List<QuoteDetails>> _productDetails;

will not be thread-safe, since List is not thread-safe. While one thread is adding entries to the list, another might be iterating over it. This will eventually fail.

I would suggest using:

ConcurrentDictionary<string, ConcurrentQueue<QuoteDetails>> _productDetails;

or:

ConcurrentDictionary<string, BlockingCollection<QuoteDetails>> _productDetails;

You may also possibly be able to remove the ConcurrentDictionary altogether.

Upvotes: 3

Related Questions