Reputation: 1661
I'm working on a service which is responsible for logging requests sent to our service. The service is working offline ( is being fired and forget ).
We are saving the requests to different databases based on some input parameter(product id). We don't want to save to the database every time someone do a request - we would rather like to build some "batch" to be inserted and execute InsertMany
every N
amount of time ( let's say 10 seconds ). I've started implementing that and now I'm struggling about 2 things:
ConcurrentDictionary
? It seems like i would achieve the same with normal DictionaryConcurrentDictionary
" - is there a way to re-write my code to "properly" use ConcurrentDictionary
so i can avoid using lock and ensure that AddOrUpdate
won't have "collisions" with clearing the batch ?Let me paste the snippet and explain further:
// dictionary where key is ProductId and value is a list of items to insert to that product database
ConcurrentDictionary<string, List<QuoteDetails>> _productDetails;
public SaverService(StatelessServiceContext context)
: base(context)
{
_productDetails = new ConcurrentDictionary<string, List<QuoteDetails>>();
}
// this function will be fired and forgotten by the external service
public async Task SaveRecentRequest(RequestOptions requestData, Response responseData)
{
await Task.Run(() => {
foreach (var token in requestData.ProductAccessTokens)
{
// this function will extract the specific product request ( one request can contain multiple products )
var details = SplitQuoteByProduct(requestData, responseData, token);
_productDetails.AddOrUpdate(token, new List<QuoteDetails>() { details }, (productId, list) =>
{
list.Add(details);
return list;
});
}
});
}
// this function will be executed by a timer every N amount of time
public void SaveRequestsToDatabase()
{
lock (_productDetails)
{
foreach (var item in _productDetails)
{
// copy curent items and start a task which will process them
SaveProductRequests(item.Key, item.Value.ToList());
// clear curent items
item.Value.Clear();
}
}
}
public async Task SaveProductRequests(string productId, List<QuoteDetails> productRequests)
{
// save received items to database
/// ...
}
My main concern is that without lock following scenario occurs:
SaveRequestsToDatabase
is fired - and is starting to process the dataitem.Value.Clear();
in SaveRequestsToDatabase
function, the external service fires another SaveRecentRequest
function which executes AddOrUpdate
with the same key - which will add request to the collectionSaveRequestsToDatabase
is finishing and therefore clearing the collection - but initially object added by 2. was not in the collection so was not processedUpvotes: 2
Views: 1344
Reputation: 39027
Often, concurrency issues come from not picking the right data structures in first place.
In your case, you have two workflows:
Your issue is that you're trying to categorize the events right off the bat, even though it's not needed. Keep the events as a simple stream in the concurrent part, and sort them only in the consumer part since you have no concurrency there.
ConcurrentQueue<(string token, QuoteDetails details)> _productDetails;
public SaverService(StatelessServiceContext context)
: base(context)
{
_productDetails = new ConcurrentQueue<(string, QuoteDetails)>();
}
// this function will be fired and forgotten by the external service
public async Task SaveRecentRequest(RequestOptions requestData, Response responseData)
{
await Task.Run(() => {
foreach (var token in requestData.ProductAccessTokens)
{
// this function will extract the specific product request ( one request can contain multiple products )
var details = SplitQuoteByProduct(requestData, responseData, token);
_productDetails.Enqueue((token, details));
}
});
}
// this function will be executed by a timer every N amount of time
public void SaveRequestsToDatabase()
{
var products = new List<(string token, QuoteDetails details)>();
while (_productDetails.TryDequeue(out var item))
{
products.Add(item);
}
foreach (var group in products.GroupBy(i => i.token, i => i.Details))
{
SaveProductRequests(group.Key, group);
}
}
public async Task SaveProductRequests(string productId, IEnumerable<QuoteDetails> productRequests)
{
// save received items to database
/// ...
}
Upvotes: 3
Reputation: 450
You need to lock the dictionary whenever you add/remove/read to or from it. Your current code will allow the SaveRecentRequest to add items to the dictionary even while you are busy processing items from it. I suggest the following approach
// dictionary where key is ProductId and value is a list of items to insert to that product database
Dictionary<string, List<QuoteDetails>> _productDetails;
public SaverService(StatelessServiceContext context)
: base(context)
{
_productDetails = new Dictionary<string, List<QuoteDetails>>();
}
// this function will be fired and forgotten by the external service
public async Task SaveRecentRequest(RequestOptions requestData, Response responseData)
{
await Task.Run(() => {
foreach (var token in requestData.ProductAccessTokens)
{
// this function will extract the specific product request ( one request can contain multiple products )
var details = SplitQuoteByProduct(requestData, responseData, token);
lock(_padlock)
{
_productDetails.AddOrUpdate(token, new List<QuoteDetails>() { details }, (productId, list) =>
{
list.Add(details);
return list;
});
}
}
});
}
// this function will be executed by a timer every N amount of time
public void SaveRequestsToDatabase()
{
Dictionary<string, List<QuoteDetails>> offboardingDictionary;
lock (_padlock)
{
offboardingDictionary = _productDetails;
_productDetails = new Dictionary<string, List<QuoteDetails>>();
}
foreach (var item in offboardingDictionary)
{
// copy curent items and start a task which will process them
SaveProductRequests(item.Key, item.Value.ToList());
// clear curent items
item.Value.Clear();
}
}
public async Task SaveProductRequests(string productId, List<QuoteDetails> productRequests)
{
// save received items to database
/// ...
}
private readonly object _padlock = new object();
With this you lock when you add items to the dictionary. To improve the saving performance we add a new reference to our dictionary and then replace original one with a new instance. In this way we minimize the time in the lock so new items coming in can be held in the new dictionary while our saving thread offloads items to the database from the previous dictionary.
I don't think you need a concurrent dictionary for this task, a regular dictionary will do as long as you lock your accessess
Upvotes: 1
Reputation: 23983
ConcurrentDictionary<string, List<QuoteDetails>> _productDetails;
will not be thread-safe, since List is not thread-safe. While one thread is adding entries to the list, another might be iterating over it. This will eventually fail.
I would suggest using:
ConcurrentDictionary<string, ConcurrentQueue<QuoteDetails>> _productDetails;
or:
ConcurrentDictionary<string, BlockingCollection<QuoteDetails>> _productDetails;
You may also possibly be able to remove the ConcurrentDictionary
altogether.
Upvotes: 3