Reputation: 153
My standard approach to handling multithreading and caching has been to use the "Double-checked locking" pattern. In situations where the data retrieval can take a long time this results in subsequent threads waiting while the first thread refreshes the cache. If the throughput of requests is of a higher priority than the freshness of the data, I'd like to be able to continue serving the stale cache data to the subsequent threads while the cache is refreshed.
I'm using the ObjectCache
within System.Runtime.Caching
. Items placed in the cache have a flag indicating whether the data is stale. When the item expires and is removed from the cache, I have used the RemoveCallback
mechanism to re-enter the item with the stale flag set.
The code from handling access to the cache is as follows:
class Repository {
static ObjectCache Cache = MemoryCache.Default;
static readonly SemaphoreSlim RefreshCacheSemaphore = new SemaphoreSlim(1);
static volatile bool DataIsBeingRefreshed;
public async Task<object> GetData() {
const string cacheKey = "Key";
var cacheObject = Cache.Get(cacheKey) as CacheObject;
if(cacheObject != null && (!cacheObject.IsStale || DataIsBeingRefreshed)) {
return cacheObject.Item;
}
await RefreshCacheSemaphore.WaitAsync();
try {
// Check again that the cache item is still stale.
cacheObject = Cache.Get(cacheKey) as CacheObject;
if(cacheObject != null && !cacheObject.IsStale) {
return cacheObject.Item;
}
DataIsBeingRefreshed = true;
// Get data from database.
// Store new data in cache.
DataIsBeingRefreshed = false;
// Return new data.
} finally {
RefreshCacheSemaphore.Release();
}
}
}
The problem with this is that depending on the time between calls, threads will either successfully serve stale data or get stuck waiting to enter the code protected by the semaphore. Ideally I don't want any threads waiting while the cache is refreshed.
Alternatively I could change the method to be:
public async Task<object> GetData() {
const string cacheKey = "Key";
var cacheObject = Cache.Get(cacheKey) as CacheObject;
if(cacheObject != null && (!cacheObject.IsStale || DataIsBeingRefreshed)) {
return cacheObject.Item;
}
// New semaphore.
await GetStaleDataSemaphore.WaitAsync();
try {
cacheObject = Cache.Get(cacheKey) as CacheObject;
if(cacheObject != null && DataIsBeingRefreshed) {
return cacheObject.Item
}
DataIsBeingRefreshed = true;
} finally {
GetStaleDataSemaphore.Release();
}
await RefreshCacheSemaphore.WaitAsync();
try {
// Check again that the cache item is still stale.
cacheObject = Cache.Get(cacheKey) as CacheObject;
if(cacheObject != null && !cacheObject.IsStale)
{
return cacheObject.Item;
}
// Get data from database.
// Store new data in cache.
DataIsBeingRefreshed = false;
// Return new data.
}
finally
{
RefreshCacheSemaphore.Release();
}
}
This should reduce the number of threads waiting to refresh the cache however, I don't want to introduce more locking mechanisms if I'm missing some established pattern that would result in no threads stuck waiting.
Am I on the right lines or is there an established pattern for handling this?
Upvotes: 2
Views: 990
Reputation: 7681
I can't wrap my head around a scenario with automatically evicting items from cache and refreshing those at the same time.
In following cache implementation, items are never removed from cache, but you could probably extend this approach for eviction somehow.
The idea is that you can return a Task both for a loading task in case it's loading for the first time (no chance you could quickly return, as there is no value at this time), and for the case when there is already a value available.
The value would be just wrapped in a Task
with Task.FromResult
.
class Cache<TKey, TValue>
{
private ConcurrentDictionary<TKey, Item> d = new ConcurrentDictionary<TKey, Item>();
private class Item
{
public Item(Func<Task<TValue>> loadingTask, TimeSpan ttl, CancellationToken cancellationToken)
{
Ttl = ttl;
LoadingTask = loadingTask;
ServiceTask = HandleLoaded();
CancellationToken = cancellationToken;
}
CancellationToken CancellationToken { get; }
Func<Task<TValue>> LoadingTask { get; set; }
public Task<TValue> ServiceTask {get; private set;}
private TimeSpan Ttl { get; }
async Task<TValue> HandleLoaded()
{
var value = await LoadingTask();
ServiceTask = Task.FromResult(value);
Task.Run(() => ReloadOnExpiration(), CancellationToken);
return value;
}
async void ReloadOnExpiration()
{
if (CancellationToken.IsCancellationRequested)
return;
await Task.Delay(Ttl, CancellationToken);
var value = await LoadingTask();
ServiceTask = Task.FromResult(value);
ReloadOnExpiration();
}
}
public async Task<TValue> GetOrCreate(TKey key, Func<TKey, CancellationToken, Task<TValue>> createNew, CancellationToken cancallationToken, TimeSpan ttl)
{
var item = d.GetOrAdd(key, k => new Item(() => createNew(key, cancallationToken), ttl, cancallationToken));
return await item.ServiceTask;
}
}
Here is how it could be used:
async Task Work(Cache<string, string> cache, CancellationToken cancellation)
{
var item = await cache.GetOrCreate("apple", async (k, ct) =>
{
await Task.Delay(10, ct); // simulate loading time
return $"{k} {Guid.NewGuid()}";
}, cancellation, TimeSpan.FromMilliseconds(30));
Console.WriteLine(item);
}
Upvotes: 1