Reputation: 1383
I'm trying to create a way to handle spikes of events in the eventhub. My current poc solution is just to fire and forget tasks as I'm consuming events, instead of awaiting them and then throttle parallel task amount using semaphore to avoid resource starvation.
Utility that throttles things:
public class ThrottledParallelTaskFactory
{
...
public Task StartNew(Func<Task> func)
{
_logger.LogDebug("Available semaphore count {AvailableDataConsumerCount} out of total {DataConsumerCountLimit}", _semaphore.CurrentCount, _limit);
_semaphoreSlim.Wait(_timeout);
_ = Task.Run(func)
.ContinueWith(t =>
{
if (t.Status is TaskStatus.Faulted or TaskStatus.Canceled or TaskStatus.RanToCompletion)
{
_semaphoreSlim.Release();
_logger.LogDebug("Available semaphore count {AvailableDataConsumerCount} out of total {DataConsumerCountLimit}", _semaphore.CurrentCount, _limit);
}
if (t.Status is TaskStatus.Canceled or TaskStatus.Faulted)
{
_logger?.LogError(t.Exception, "Parallel task failed");
}
});
return Task.CompletedTask;
}
}
My EventProcessorClient.ProcessEventAsync
delegate:
private Task ProcessEvent(ProcessEventArgs arg)
{
var sw = Stopwatch.StartNew();
try
{
_throttledParallelTaskFactory.StartNew(async () => await Task.Delay(1000));
}
catch (Exception e)
{
_logger.LogError(e, "Failed to process event");
}
_logger.LogDebug($"Took {sw.ElapsedMilliseconds} ms");
return Task.CompletedTask;
}
After running this setup for a while, I noticed that my throttler's Semaphore maxes out at 2-3 tasks running in parallel, when my configured limit is 15. This kind of suggests to me that my handler takes 333-500ms to finish, but Stopwatch inside the handler says that the whole handler takes 0 ms to execute. I later added timestamp logging of when handler starts/ends to confirm it and it does take 0-1ms, but there's a mystery 300-600ms gap between them. NOTE: For current tests, this client is processing a backlog of millions of events, it's not processing live data, which could cause similar delays between events.
Does by any chance EventProcessorClient
checkpoint internally after every single event? 300-500ms seems massive in my head.
I have both used default cached event/prefetch counts and increased ones without much difference.
Edit:
It ended up being not implementation related networking issue
Upvotes: 0
Views: 409
Reputation: 29720
You are not measuring the right thing and basically you are using async/await & Task wrong.
private Task ProcessEvent(ProcessEventArgs arg)
{
var sw = Stopwatch.StartNew();
try
{
_throttledParallelTaskFactory.StartNew(async () => await Task.Delay(1000));
}
catch (Exception e)
{
_logger.LogError(e, "Failed to process event");
}
_logger.LogDebug($"Took {sw.ElapsedMilliseconds} ms");
return Task.CompletedTask;
}
In the above code the call to _throttledParallelTaskFactory.StartNew
is not awaited. So the stopwatch has nothing to measure. Furthermore, since the call is not awaited any exception won't be caught.
You should move the exception handling and time measurement to the StartNew
method like this:
private Task ProcessEvent(ProcessEventArgs arg)
{
_throttledParallelTaskFactory.StartNew(() => Task.Delay(1000));
return Task.CompletedTask;
}
public class ThrottledParallelTaskFactory
{
public async Task StartNew(Func<Task> func)
{
var sw = Stopwatch.StartNew();
_logger.LogDebug("Available semaphore count {AvailableDataConsumerCount} out of total {DataConsumerCountLimit}", _semaphore.CurrentCount, _limit);
_semaphoreSlim.Wait(_timeout);
try
{
await func.Invoke();
}
catch
{
_logger.LogError(e, "Failed to process event");
_logger?.LogError(t.Exception, "Parallel task failed");
}
finally
{
_semaphoreSlim.Release();
_logger.LogDebug("Available semaphore count {AvailableDataConsumerCount} out of total {DataConsumerCountLimit}", _semaphore.CurrentCount, _limit);
_logger.LogDebug($"Took {sw.ElapsedMilliseconds} ms");
}
}
}
See how we got rid of the call to ContinueWith
? Also, since the func already represents a Task
there is no need to wrap the code in a call to Task.Run
.
Does by any chance EventProcessorClient checkpoint internally after every single event?
No, it does not. You have to do checkpointing manually.
Upvotes: 2