Reputation: 95
My use case is: Server side long running gRPC Streaming method. I want that the server method stops executing if the client does not send a request within a given period. Say 5 minutes for example. And I want to reset this timeout every time I receive a request.
public override async Task StreamingMethodName(IAsyncStreamReader<RequestProtoDTO> requestStream, IServerStreamWriter<ResponseProtoDTO> responseStream, ServerCallContext context)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken, WATCHDOG_CTS_GOES_HERE);
while(await requestStream.MoveNext(cts.Token))
{
//my code for processing goes here
WATCHDOG_KICKING_GOES_HERE
}
}
Does anything like this exist or do I have to make my own CancellationTokenSource? I cannot be the first or only one for this and unfortunately am stuck with the buzzword watchdog.
The point is that on mobile clients, sometimes the connection over HTTP2 gets disconnected without the server noticing. I want those resources to be freed. Without the need of a reverse proxy to do this job for me.
Upvotes: 0
Views: 66
Reputation: 22073
You can create a new cancellationtoken with a timeout:
// create a cancellationtoken with a timeout
var watchdogCts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
// combine them
var cts = CancellationTokenSource.CreateLinkedTokenSource(
context.CancellationToken, watchdogCts.Token);
// pass the combined token.
while(await requestStream.MoveNext(cts.Token))
You probably need a try/catch to catch the TaskCanceledException
over your while
.
UPDATE
I think I understand what you're asking. I've made an example to trigger cancellation token after a certain amount of time, with a Reset option:
/// <summary>
/// A watchdog timer that cancels a linked CancellationToken if not reset within a given timeout.
/// Implements IDisposable to allow for proper cleanup.
/// </summary>
public sealed class WatchdogCounter : IDisposable
{
private readonly CancellationTokenSource _watchdogCts; // The CancellationTokenSource that will be canceled when the timeout expires
private readonly object _lock = new(); // Lock object for thread safety
private DateTime _lastReset = DateTime.UtcNow; // Stores the last reset timestamp, initialize last reset time to now;
private readonly ManualResetEvent _terminating = new(false); // Used to signal termination of the watchdog thread
/// <summary>
/// Exposes the CancellationToken that will be canceled when the watchdog expires.
/// </summary>
public CancellationToken Token => _watchdogCts.Token;
/// <summary>
/// Initializes a new instance of the WatchdogCounter class.
/// </summary>
/// <param name="timeout">The duration before the watchdog triggers cancellation.</param>
/// <param name="parentToken">A parent CancellationToken that can also trigger cancellation.</param>
public WatchdogCounter(TimeSpan timeout, CancellationToken parentToken)
{
_watchdogCts = CancellationTokenSource.CreateLinkedTokenSource(parentToken); // Links to an external cancellation token
_lastReset = DateTime.UtcNow; // Initialize last reset time to now
StartWatchdog(timeout); // Start the watchdog timer
}
private void StartWatchdog(TimeSpan timeout)
{
_ = Task.Run(async () =>
{
try
{
DateTime lastReset;
while (!_terminating.WaitOne(0)) // Check if termination has been requested
{
// Wait for a short period before checking again
await Task.Delay(500, _watchdogCts.Token);
// Get the latest lastReset value in a thread-safe way
lock (_lock)
lastReset = _lastReset;
// If the timeout has been exceeded, cancel the token
if (lastReset.Add(timeout) < DateTime.UtcNow)
{
_watchdogCts.Cancel();
break;
}
}
}
catch (TaskCanceledException) { }
});
}
/// <summary>
/// Resets the watchdog timer, postponing cancellation.
/// </summary>
public void Reset()
{
lock (_lock)
_lastReset = DateTime.UtcNow; // Update the last reset time to the current time
}
/// <summary>
/// Disposes of the watchdog by signaling termination.
/// </summary>
public void Dispose()
{
_terminating.Set(); // Signal the watchdog task to stop
}
}
usage:
public override async Task StreamingMethodName(IAsyncStreamReader<RequestProtoDTO> requestStream, IServerStreamWriter<ResponseProtoDTO> responseStream, ServerCallContext context)
{
using (var watchDog = new WatchdogCounter(TimeSpan.FromSeconds(5), context.CancellationToken))
{
while(await requestStream.MoveNext(cts.Token))
{
//my code for processing goes here
watchDog.Reset();
}
}
}
The timing is not perfect, because the TaskDelay is not aborted, when the time is up. You have to look at how long you want the interval of checking.
I hope I'm on the right track now.
Upvotes: 1