Reputation: 1569
In the system I am working at, I have a REST Api available for multiple parallel requests. In the background, I have a Thrift connection with another tool, allowing maximal 5 threads. To protect the access to that connection, I am using a SemaphoreSlim object with also a configurable timeout. This is my code:
private static readonly SemaphoreSlim _lockGuard = new SemaphoreSlim(1, 5);
private static int _waitingThreads = 0;
public async Task<Models.OperationStatus> PushItemWithResultAsync<TPushData>(TPushData data, CancellationToken ct)
where TPushData : Thrift.Protocol.TBase
{
Interlocked.Increment(ref _waitingThreads);
// Use a semaphore to prevent parallel calls.
if (await _lockGuard.WaitAsync(_millisecondsThriftTimeout, ct))
{
_logger.LogDebug($"Push data using {_waitingThreads} threads, but {_lockGuard.CurrentCount} within the semaphore");
Models.OperationStatus result;
try
{
// push the data to the single Thrift connection.
result = await PushItemWithResultAsyncLogic(data, ct).ConfigureAwait(false);
}
finally
{
_lockGuard.Release();
Interlocked.Decrement(ref _waitingThreads);
}
return result;
}
else
{
Interlocked.Decrement(ref _waitingThreads);
return GetFailureOperationStatus();
}
}
I am a bit confused how to use the timeout of the semaphore and the allowed thread.
I first started with new SemaphoreSlim(1, 1); to allow only a single connection. The code behaves as expected: only one call to PushItemWithResultAsyncLogic runs. If too many calls to the methods are coming, they will timeout.
Now, I want to max 5 running PushItemWithResultAsyncLogic in parallel, thus the max allowed thread in the SemaphoreSlim raised to 5. I expect to still have possible timeout when a lot of external requests are coming.
If 10 parallel calls are received, the timeout of 500ms is still enough to handle all of them. The _waitingThreads raises up to 10 but _lockGuard.CurrentCount is always 0. I was expecting to have something moving between 1 and 5... And most importantly, I don't see the below component to be called up to 5 times.
What is wrong here?
EDIT: short answer: the initialCount should be set to 5 as explained below. So with new SemaphoreSlim(5, 5), I do have parallel requests AND the timeout playing its job.
Upvotes: 0
Views: 6011
Reputation: 21
Set the initialCount
constructor parameter to the same value as maxCount
. CurrentCount
goes from initialCount
towards 0. So, if the initialCount
is 1 then the first thread sets the CurrentCount
to zero and prevents other threads from running.
Upvotes: 2
Reputation: 4543
Tried to simulate your scenario with the console app below and added some comments. The Wait method in the semaphore is not used to define a timeout. It is used to block a thread from entering until there is a free slot. Also a distinction needs to be made between allowing a given number of requests to wait and rejecting the others. In the simulation below, 5 is the max number of requests that can be processed in parallel. Others will immediately be rejected until a processing slot is free again.
As mentioned in the comments, this will not throttle. If you want to change the number of max requests processed in parallel, you need to 1. increase the maxCount
in Semaphore constructor and 2. Release(maxCount)
to reset the Semaphore. Or simply call the constructor with initialCount
= maxCount
, for ex. SemaphoreSlim(5, 5)
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Semaphore
{
public class Program
{
private static readonly SemaphoreSlim _lockGuard = new SemaphoreSlim(0, 5);
private static int totalProcessed; // Stats only.
public static void Main()
{
// Reset semaphore by allowing max 5 requests to enter the concurrent section.
_lockGuard.Release(5);
var requests = new Task[20];
for (var i = 0; i < 20; i++)
{
Console.WriteLine("Sending request: " + i);
var i1 = i;
requests[i] = Task.Run(() => ProcessRequest(i1));
// This is the interval between each request.
// Bigger value means more requests will be processed.
Thread.Sleep(20);
}
Task.WaitAll(requests);
Console.WriteLine("---");
Console.WriteLine("Processed requests: " + totalProcessed);
Console.WriteLine("Rejected requests: " + (20 - totalProcessed));
Console.ReadLine();
}
public static async Task ProcessRequest(int i)
{
// If the semaphore is already full, reject the request.
// This means requests won't get queued.
if (_lockGuard.CurrentCount == 0)
{
Console.WriteLine("ERROR: CurrentCount == 0 for " + i);
return;
}
try
{
// If this request was allowed to enter the semaphore, wait until a processing slot is free.
// This decrements CurrentCount.
_lockGuard.Wait();
// Once previous request completed (Release()), process the next waiting one.
await Task.Run(() => {
Console.WriteLine("Processing request " + i );
Thread.Sleep(300); // Simulate request processing time.
});
totalProcessed++;
}
finally
{
// Processing completed: release one slot in the semaphore.
_lockGuard.Release();
Console.WriteLine("Releasing " + i);
}
}
}
}
Note
As Kit said in the comments, you don't need a Semaphore to achieve what I understand you're trying to achieve. I assume there are no shared resources in PushItemWithResultAsyncLogic
code, so what you need is effectively a counter and/or some request queue management if you want to allow some requests to wait.
EDIT
To process as many requests as possible and have them waiting for a timeout, there is no need to check CurrentCount
. Simply Wait(x milliseconds) and either request will be processed with lock aquired or it will be rejected and Wait will return false.
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Semaphore
{
public class Program
{
private static readonly SemaphoreSlim _lockGuard = new SemaphoreSlim(5, 5);
private static readonly Random rnd = new Random((int)DateTimeOffset.Now.Ticks);
private static int totalProcessed; // Stats
public static void Main()
{
var requests = new Task[20];
for (var i = 0; i < 20; i++)
{
var i1 = i;
requests[i] = Task.Run(() => ProcessRequest(i1));
Thread.Sleep(100);
}
Task.WaitAll(requests);
Console.WriteLine("----------------------------");
Console.WriteLine("Processed requests: " + totalProcessed);
Console.WriteLine("Rejected requests: " + (20 - totalProcessed));
Console.WriteLine("----------------------------");
Console.ReadLine();
}
public static async Task ProcessRequest(int i)
{
Console.WriteLine(i + " Wait");
// If at the end of the wait no slot is free, reject.
if (!await _lockGuard.WaitAsync(500))
{
Console.WriteLine(i + " Reject");
return;
}
try
{
// Once previous request completed (Release()), process the next waiting one.
await Task.Run(() =>
{
Console.WriteLine(i + " Process");
Thread.Sleep(rnd.Next(600, 800)); // Simulate random request processing time.
});
totalProcessed++;
}
finally
{
// Processing completed: release one slot in the semaphore.
_lockGuard.Release();
}
}
}
}
Upvotes: 2