user9124444
user9124444

Reputation:

Resource concurrency, allow access to one or multiple threads per given resource

Following is my problem.

I have an API controller with an API endpoint inside of it (the resource).

 api/myResource/{id}/do-something

I'm working on a middleware, that will limit access to this resource based upon some business rules. Inside this middleware, I'm matching the incoming request, I'm parsing the URI and I want to allow access to it (let the pipeline flow) or simply return with a 412 status code in case the limit of allowed threads is reached FOR THE GIVEN RESOURCE (e.g)

 api/myResource/1/do-something /// should allow 2 concurrent accesses.
 api/myResource/2/do-something /// should allow 10 concurrent accesses.
 api/myResource/3/do-something /// should allow 1 concurrent accesses.

For that I've started implementing a solution which I will attach.

internal class AsyncLock<TKey>
{
    private static readonly ConcurrentDictionary<TKey, SemaphoreSlim> _safeSemaphores
        = new ConcurrentDictionary<TKey, SemaphoreSlim>();

    internal async Task<SemaphoreSlim> TryLockAsync(TKey key, int maxConcurrentCount)
    {
        if (!_safeSemaphores.TryGetValue(key, out SemaphoreSlim semaphore))
        {
            semaphore = new SemaphoreSlim(maxConcurrentCount, maxConcurrentCount);

            _safeSemaphores.TryAdd(key, semaphore);
        }

        await semaphore.WaitAsync();

        return semaphore;
    }

    internal SemaphoreSlim TryLock(TKey key, int maxConcurrentCount)
    {
        if (!_safeSemaphores.TryGetValue(key, out SemaphoreSlim semaphore))
        {
            semaphore = new SemaphoreSlim(maxConcurrentCount, maxConcurrentCount);

            _safeSemaphores.TryAdd(key, semaphore);
        }

        semaphore.Wait();

        return semaphore;
    }
}

This is how it is used (it allows for 2 concurrent accesses, of course this is the subject of this question, it is not going to be a hardcoded 2 but determined earlier in the pipeline)

AsyncLock<string> _lock = new AsyncLock<string>();
SemaphoreSlim semaphore = await _lock .TryLockAsync(key, 2);

if (semaphore.CurrentCount != 0)
{
    context.Response.StatusCode = 200;
    //await _next(context);
    semaphore.Release();
}
else
{
    context.Response.StatusCode = 412;
}

How it behaves is unpredictable. I'm testing using 4 threads, sometimes it works as expected, sometimes they all return 200, other times they all get stuck, I mean it is a combination every time.

I would really appreciate some help figuring this one out.

Upvotes: 0

Views: 1036

Answers (3)

user9124444
user9124444

Reputation:

I've came out with this solution, It seems to work as expected.

internal sealed class AsyncLock<TKey>
{
    public readonly Dictionary<TKey, SemaphoreSlim> _semaphores = new Dictionary<TKey, SemaphoreSlim>();

    internal IDisposable Lock(TKey key, int maxDegreeOfParallelism)
    {
        bool acquired = GetOrAdd(key, maxDegreeOfParallelism).Wait(0);

        return acquired
            ? new Releaser(key, this)
            : null;
    }

    internal async Task<IDisposable> LockAsync(TKey key, int maxDegreeOfParallelism = 1)
    {
        bool acquired = await GetOrAdd(key, maxDegreeOfParallelism).WaitAsync(0);

        return acquired
            ? new Releaser(key, this)
            : null;
    }

    private SemaphoreSlim GetOrAdd(TKey key, int maxConcurrencyCount = 1)
    {
        lock (_semaphores)
        {
            if (!_semaphores.TryGetValue(key, out SemaphoreSlim semaphore))
            {
                _semaphores[key] = semaphore = new SemaphoreSlim(maxConcurrencyCount, maxConcurrencyCount);
            }

            return semaphore;
        }
    }

    private sealed class Releaser : IDisposable
    {
        private AsyncLock<TKey> _parent;

        public void Dispose()
        {
            lock (_parent._semaphores)
            {
                if (_parent._semaphores.TryGetValue(Key, out SemaphoreSlim semaphore)) semaphore.Release();
            }
        }

        public TKey Key { get; }

        public Releaser(TKey key, AsyncLock<TKey> parent)
        {
            Key = key;
            _parent = parent;
        }
    }
}

To use it:

AsyncLock<string> _asyncLock = new AsyncLock<string>();

IDisposable disposable = _asyncLock.Lock(key, maxConcurrency: 2);

if (disposable is null)
{
    /// thread skipped
}
else
{
    ///  thread entered
    disposable.Dispose();
}

Upvotes: 1

Theodor Zoulias
Theodor Zoulias

Reputation: 43400

Here is a KeyedNamedSemaphore class that you could use. It stores one named Semaphore per key. After the key is created, the associated semaphore remains in the dictionary until the KeyedNamedSemaphore instance is disposed.

public class KeyedNamedSemaphore<TKey> : IDisposable
{
    private readonly ConcurrentDictionary<TKey, Semaphore> _perKey;
    private readonly string _prefix;

    public KeyedNamedSemaphore(string prefix = null,
        IEqualityComparer<TKey> keyComparer = null)
    {
        _perKey = new ConcurrentDictionary<TKey, Semaphore>(keyComparer);
        _prefix = prefix ?? $@"Global\{System.Reflection.Assembly
            .GetExecutingAssembly().GetName().Name}-";
    }

    public bool TryLock(TKey key, int maximumConcurrency, out Semaphore semaphore)
    {
        if (!_perKey.TryGetValue(key, out semaphore))
        {
            var newSemaphore = new Semaphore(maximumConcurrency, maximumConcurrency,
                $"{_prefix}{key}");
            semaphore = _perKey.GetOrAdd(key, newSemaphore);
            if (semaphore != newSemaphore) newSemaphore.Dispose();
        }
        var acquired = semaphore.WaitOne(0);
        if (!acquired) semaphore = null;
        return acquired;
    }

    public void Dispose()
    {
        foreach (var key in _perKey.Keys)
            if (_perKey.TryRemove(key, out var semaphore)) semaphore.Dispose();
    }
}

Usage example:

var locker = new KeyedNamedSemaphore<string>();
if (locker.TryLock("api/myResource/1/do-something", 10, out var semaphore))
{
    try
    {
        await DoSomethingAsync();
    }
    finally { semaphore.Release(); }
}
else
{
    await DoSomethingElseAsync();
}

Upvotes: 0

John Wu
John Wu

Reputation: 52210

Seems simplest to use Monitor.TryEnter.

object _lock = new object();


void RunIfNotLocked()
{
    bool lockAcquired = false;

    Monitor.TryEnter(_lock, ref lockAcquired);
    if ( !lockAcquired ) 
    {
        //Skip
        return;
    }
    try
    {
        DoSomething();
    }
    finally
    {         
        Monitor.Exit(_lock);
    }
}

Upvotes: 3

Related Questions