Reputation: 49
I'm looking for an approach to locking that, by default, makes sure that all calls to a single API are run mutually exclusive using distributed locking. However, at the same time I need the option to instead lock larger blocks of code (critical procedures) containing several calls to that API. Those calls should still be run mutually exclusive. In those cases the approach should be re-entrant, so that each call isn't blocked because the block of code it is in already holds the lock. It should also support re-entrancy if there are several methods nested that lock sections of code.
Examples of use:
// Should have lock registered by default (f.ex. in HttpMessageHandler)
await _deviceClient.PerformAction();
async Task CriticalProcedure()
{
// Should only use one lock that is reused in nested code (re-entrant)
await using (await _reentrantLockProvider.AcquireLockAsync())
{
await _deviceClient.TriggerAction();
await SharedCriticalProcedure();
}
// Should only dispose lock at this point
}
async Task SharedCriticalProcedure()
{
await using (await _customLockProvider.AcquireLockAsync())
{
await _deviceClient.HardReset();
await _deviceClient.Refresh();
}
}
// Should be forced to run sequentially even though they are not awaited (mutex)
var task1 = _deviceClient.PerformAction1();
var task2 = _deviceClient.PerformAction2();
await Task.WhenAll(task1, task2);
Background:
My team is working on a WebAPI that is responsible for making calls to hardware devices. When an endpoint in our API is called, we get a header that identifies the hardware device in question, used in startup to configure the baseUrl of our HttpClients, and we make one or more calls to that API. We have the following limitations:
I have created the following solution which I believe works. However, it seems clumsy and overengineered, mostly because HttpMessageHandlers have unpredictable lifetimes, not scoped to request, so I needed to use TraceIdentifier
and a dictionary to enable re-entry during the request lifecycle.
// In startup
service.AddSingleton<IReentrantLockProvider, ReentrantLockProvider>();
services.
.AddHttpClient(IDeviceClient)
.AddTypedClient(client => RestService.For<IDeviceClient>(client, refitSettings))
.ConfigureHttpClient((provider, client) => ConfigureHardwareBaseUrl())
.AddHttpMessageHandler<HardwareMutexMessageHandler>();
public class HardwareMutexMessageHandler : DelegatingHandler
{
private readonly IReentrantLockProvider _reentrantPanelLockProvider;
private readonly IHttpContextAccessor _httpContextAccessor;
private readonly ConcurrentDictionary<string, object> _locks;
public HardwareMutexMessageHandler(IReentrantLockProvider reentrantPanelLockProvider, IHttpContextAccessor httpContextAccessor)
{
_reentrantPanelLockProvider = reentrantPanelLockProvider;
_httpContextAccessor = httpContextAccessor;
}
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
await using (await _reentrantPanelLockProvider.AcquireLockAsync(cancellationToken))
{
var hardwareId = _httpContextAccessor.HttpContext.Request.Headers["HardwareId"];
var mutex = _locks.GetOrAdd(hardwareId, _ => new());
// This is only used to handle cases where developer chooses to batch calls or forgets to await a call
lock (mutex)
{
return base.SendAsync(request, cancellationToken).Result;
}
}
}
}
public class ReentrantLockProvider : IReentrantLockProvider
{
private readonly IDistributedLockProvider _distributedLockProvider;
private readonly IHttpContextAccessor _httpContextAccessor;
private readonly ConcurrentDictionary<string, ReferenceCountedDisposable> _lockDictionary;
private readonly object _lockVar = new();
public ReentrantLockProvider(IDistributedLockProvider distributedLockProvider, IHttpContextAccessor httpContextAccessor)
{
_distributedLockProvider = distributedLockProvider;
_httpContextAccessor = httpContextAccessor;
_lockDictionary = new ConcurrentDictionary<string, ReferenceCountedDisposable>();
}
public async Task<IAsyncDisposable> AcquireLockAsync(CancellationToken cancellationToken = default)
{
var hardwareId = _httpContextAccessor.HttpContext.Request.Headers["HardwareId"];
var requestId = _httpContextAccessor.HttpContext.TraceIdentifier;
lock (_lockVar)
{
if (_lockDictionary.TryGetValue(requestContext.CorrelationId, out referenceCountedLock))
{
referenceCountedLock.RegisterReference();
return referenceCountedLock;
}
acquireLockTask = _distributedLockProvider.AcquireLockAsync(hardwareId, timeout: null, cancellationToken);
referenceCountedLock = new ReferenceCountedDisposable(async () =>
await RemoveLock(acquireLockTask.Result, requestContext.CorrelationId)
);
_lockDictionary.TryAdd(requestContext.CorrelationId, referenceCountedLock);
}
}
private async Task RemoveLock(IDistributedSynchronizationHandle acquiredLock, string correlationId)
{
ValueTask disposeAsyncTask;
lock (_lockVar)
{
disposeAsyncTask = acquiredLock.DisposeAsync();
_ = _lockDictionary.TryRemove(correlationId, out _);
}
await disposeAsyncTask;
}
}
public class ReferenceCountedDisposable : IAsyncDisposable
{
private readonly Func<Task> _asyncDispose;
private int _refCount;
public ReferenceCountedDisposable(Func<Task> asyncDispose)
{
_asyncDispose = asyncDispose;
_refCount = 1;
}
public void RegisterReference()
{
Interlocked.Increment(ref _refCount);
}
public async ValueTask DisposeAsync()
{
var references = Interlocked.Decrement(ref _refCount);
if (references == 0)
{
await _asyncDispose();
}
else if (references < 0)
{
throw new InvalidOperationException("Can't dispose multiple times");
}
else
{
GC.SuppressFinalize(this);
}
}
}
Upvotes: 0
Views: 311