Reputation: 477
I am implementing in C# .NET a simple message consumer that reads off a queue a bunch of key, value
pair messages and I need to maintain a dictionary of key -> [values seen so far]
(this is MemoryCache _memoryCache
in the code below). Because periodically I want to write that to a DB, appending the values to the current one found in the DB.
How to go about that? In the following code I am maintaining a ConcurrentDictionary
of keys and every 30 minutes I am looping over the keys to append to the DB but I am afraid of race conditions (the app has several instances, each has several cores).
public class MyMessageConsumer : BaseMessageConsumer
{
private readonly MemoryCache _memoryCache; // key -> [values]
private ConcurrentDictionary<string, bool> _memoryCacheKeys;
private DateTime _lastUpdate;
// not expanding here on the constructors etc that set _lastUpdate etc.
internal async Task<MessageConsumeResult> UpdateAsync(string cacheKey, string value)
{
await UpdateMemoryCache(cacheKey, keyword);
if (DateTime.Now > _lastUpdate.AddMinutes(30))
{
_lastUpdate = DateTime.Now;
foreach (var (cacheKey, _) in _memoryCacheKeys)
{
if (_memoryCache.TryGetValue(cacheKey, out HashSet<string> values))
{
WriteToSomeDB()
_memoryCache.Remove(RemoveKey(cacheKey));
}
}
}
return MessageConsumeResult.Completed();
}
private async Task UpdateMemoryCache(string cacheKey, string value)
{
if (_memoryCache.TryGetValue(cacheKey, out HashSet<string> previousValues))
{
if (!previousValues.Contains(value))
{
previousValues.Add(value);
_memoryCache.Set(AddKey(cacheKey), previousValues, TimeSpan.FromHours(48));
}
}
else
{
_memoryCache.Set(AddKey(cacheKey), new HashSet<string> { value }, TimeSpan.FromHours(48));
}
}
private string AddKey(string key)
{
_memoryCacheKeys.TryAdd(key, true);
return key;
}
private string RemoveKey(string key)
{
_memoryCacheKeys.TryRemove(key, out _);
return key;
}
}
I am not too familiar with concurrency but I am assuming I should introduce a lock when updating the MemoryCache (new value, and removal after insertion to the DB). Ideally I would like to just dump the result of the MemoryCache when it is about to expire! But MemoryCache does not seem to provide such a feature. Hence the workaround I came up with.
Thanks so much for your insights, and have a nice day!
Upvotes: 0
Views: 57