Reputation: 252
In our project, we have a few services that make requests to a 3rd party API, using a key.
This API has a shared rate limit between all endpoints (meaning request to one endpoint will require 2 seconds cooldown before we can use a different endpoint).
We've handled this using timed background jobs, only making requests to only one of the endpoints at any time.
After some architectural redesign, we've come to a spot where we don't rely as much on the timed background jobs, and now all HttpRequests cannot be moderated since multiple service instances are making requests to the API.
So, in our current example:
We have a few HttpClients set up to all needed API endpoints, i.e.:
services.AddHttpClient<Endpoint1Service>(client =>
{
client.BaseAddress = new Uri(configOptions.Services.Endpoint1.Url);
});
services.AddHttpClient<Endpoint2Service>(client =>
{
client.BaseAddress = new Uri(configOptions.Services.Endpoint2.Url);
});
Endpoint1Service and Endpoint2Service were before accessed by background job services:
public async Task DoJob()
{
var items = await _repository.GetItems();
foreach (var item in items)
{
var processedResult = await _endpoint1Service.DoRequest(item);
await Task.Delay(2000);
//...
}
// save all results
}
But now these "endpoint" services are accessed concurrently, and a new instance is create every time, therefore no way to moderate the request rates.
One possible solution was to create some sort of singleton request buffer is injected into all services that uses this API, and moderates these requests to go out at a given rate. Problems I see with this is it seems dangerous to store requests in a in-memory buffer, in case something goes wrong.
Is this a direction I should be looking towards, or is there anything else I can try?
Upvotes: 3
Views: 726
Reputation: 505
Hope this helps: I created the following for similar scenarios. Its objective is concurrency throttled multi threading. However it also gives you a convenient wrapper over your request processing pipeline. Additionally it provides a max number of concurrent requests limit per client (if you want to use that).
Create one instance per end point service and set its number of threads to 1 if you want a throttle of 1. Set it to 4 if you want it at 4 concurrent requests to the given end point.
or
The two implementations are interchangeable. If using in a web server context the former is probably better as it offloads to the background thread pool instead if using foreground threads.
Example Usage In your case probably set: _maxWorkerThreads to a value of 1 if you want to rate limit it at 1 concurrent request. Set it to 4 if you want to rate limit it at 4 concurrent requests.
//Example Usage for WebAPI controller
class Example
{
private static ThreadedWorkItemProcessor<DummyRequest, DummyResponse, int, WorkItemPriority> ThreadedProcessorExample = new ThreadedWorkItemProcessor<DummyRequest, DummyResponse, int, WorkItemPriority>(
_maxWorkItemLimitPerClient: 100 // Maximum number of concurrent requests in the processing queue per client. Set to int.MaxValue to disable concurrent request caps
, _maxWorkerThreads: 16 // Maximum number of threads to scale upto
, _threadStartupPerWorkItems: 4 // Consider starting a new processing thread ever X requests
, _threadStartupMinQueueSize: 4 // Do NOT start a new processing thread if work item queue is below this size
, _idleWorkerThreadExitSeconds: 10 // Idle threads will exit after X seconds
, _abandonedResponseExpirySeconds: 60 // Expire processed work items after X seconds (Maybe the client terminated or the web request thread died)
, _processRequestMethod: ProcessRequestMethod // Your Do Work method for processing the request
, _logErrorMethod: Handler_LogError
, _logMessageMethod: Handler_LogMessage
);
public async Task<DummyResponse> GetResponse([FromBody] DummyRequest _request)
{
int clientID = 1; //Replace with the client ID from your authentication mechanism if using per client request caps. Otherwise just hardcode to maybe 0 or whatever
WorkItemPriority _priority;
_priority = WorkItemPriority.Medium; //Assign the priority based on whatever prioritization rules.
int RequestID = ThreadedProcessorExample.ScheduleWorkItem(_priority, _request, clientID);
if (RequestID < 0)
{
//Client has exceeded maximum number of concurrent requests or Application Pool is shutting down
//return a suitable error message here
return new DummyResponse() { ErrorMessage = @"Maximum number of concurrent requests exceeded or service is restarting. Please retry request later." };
}
//If you need the result (Like in a webapi controller) then do this
//Otherwise if it is say a backend processing sink where there is no client waiting for a response then we are done here. just return.
KeyValuePair<bool, ThreadWorkItem<DummyRequest, DummyResponse, int>> workItemResult;
workItemResult = await ThreadedProcessorExample.TryGetProcessedWorkItemAsync(RequestID,
_timeoutMS: 1000, //Timeout of 1 second
_taskWaitType: ThreadProcessorAsyncTaskWaitType.Delay_Specific,
_delayMS: 10);
if (!workItemResult.Key)
{
//Processing timeout or Application Pool is shutting down
//return a suitable error message here
return new DummyResponse() { ErrorMessage = @"Internal system timeout or service is restarting. Please retry request later." };
}
return workItemResult.Value.Response;
}
public static DummyResponse ProcessRequestMethod(DummyRequest request)
{
// Process the request and return the response
return new DummyResponse() { orderID = request.orderID };
}
public static void Handler_LogError(Exception ex)
{
//Log unhandled exception here
}
public static void Handler_LogMessage(string Message)
{
//Log message here
}
}
Upvotes: 1