Reputation: 919
I am creating a .NET core worker service that is intended to run 10 different long running functions which parse various CSV files and spreadsheets.
I need each function to run at various times daily based on the specific function. If the very same function is currently running then it must 'await' that same function to finish before a new run.
I am new to async/await. Could anyone suggest an approach that would allow all 10 functions to run in parallel but never the same function at once?
Thanks in advance!
EDIT:
These parsers take anywhere from 5 minutes to 5 hrs to run.
Each function has its own unique needs of when exactly when to run daily or even hourly.
If a current function is running and the same function is up to run again, the next function should be removed until the next scheduled time and repeat if needed
Upvotes: 3
Views: 879
Reputation: 43515
Here is a CronosTimer
class similar in shape with the System.Timers.Timer
class, that fires the Elapsed
event on dates and times specified with a Cron expression. The event is fired in a non-overlapping manner. The CronosTimer
has a dependency on the Cronos library by Sergey Odinokov. This library is a TimeSpan
calculator, not a scheduler. Caveat: in its current version (0.7.1), the Cronos library is capped to the year 2099.
using Cronos;
/// <summary>
/// Generates non-overlapping events according to a Cron expression.
/// </summary>
public class CronosTimer : IAsyncDisposable
{
private readonly System.Threading.Timer _timer; // Used also as the locker.
private readonly CronExpression _cronExpression;
private readonly CancellationTokenSource _cts;
private Func<CancellationToken, Task> _handler;
private Task _activeTask;
private bool _disposed;
private static readonly TimeSpan _minDelay = TimeSpan.FromMilliseconds(500);
public CronosTimer(string expression, CronFormat format = CronFormat.Standard)
{
_cronExpression = CronExpression.Parse(expression, format);
_cts = new();
_timer = new(async _ =>
{
Task task;
lock (_timer)
{
if (_disposed) return;
if (_activeTask is not null) return;
if (_handler is null) return;
Func<CancellationToken, Task> handler = _handler;
CancellationToken token = _cts.Token;
_activeTask = task = Task.Run(() => handler(token));
}
try { await task.ConfigureAwait(false); }
catch (OperationCanceledException) when (_cts.IsCancellationRequested) { }
finally
{
lock (_timer)
{
Debug.Assert(ReferenceEquals(_activeTask, task));
_activeTask = null;
if (!_disposed && _handler is not null) ScheduleTimer();
}
}
});
}
private void ScheduleTimer()
{
Debug.Assert(Monitor.IsEntered(_timer));
Debug.Assert(!_disposed);
Debug.Assert(_handler is not null);
DateTime utcNow = DateTime.UtcNow;
DateTime? utcNext = _cronExpression.GetNextOccurrence(utcNow + _minDelay);
if (utcNext is null)
throw new InvalidOperationException("Unreachable date.");
TimeSpan delay = utcNext.Value - utcNow;
Debug.Assert(delay > _minDelay);
_timer.Change(delay, Timeout.InfiniteTimeSpan);
}
/// <summary>
/// Occurs when the next occurrence of the Cron expression has been reached,
/// provided that the previous asynchronous operation has completed.
/// The CancellationToken argument is canceled when the timer is disposed.
/// </summary>
public event Func<CancellationToken, Task> Elapsed
{
add
{
if (value is null) return;
lock (_timer)
{
if (_disposed) return;
if (_handler is not null) throw new InvalidOperationException(
"More than one handlers are not supported.");
_handler = value;
if (_activeTask is null) ScheduleTimer();
}
}
remove
{
if (value is null) return;
lock (_timer)
{
if (_disposed) return;
if (!ReferenceEquals(_handler, value)) return;
_handler = null;
_timer.Change(Timeout.Infinite, Timeout.Infinite);
}
}
}
/// <summary>
/// Returns a ValueTask that completes when all work associated with the timer
/// has ceased.
/// </summary>
public async ValueTask DisposeAsync()
{
Task task;
lock (_timer)
{
if (_disposed) return;
_disposed = true;
_handler = null;
task = _activeTask;
}
await _timer.DisposeAsync().ConfigureAwait(false);
_cts.Cancel();
if (task is not null)
try { await task.ConfigureAwait(false); } catch { }
_cts.Dispose();
}
}
Usage example:
CronosTimer timer = new("30 6,14,22 * * MON-FRI");
timer.Elapsed += async _ =>
{
try
{
await LongRunningAsync();
}
catch (Exception ex)
{
_logger.LogError(ex);
}
};
In this example the LongRunningAsync
function will run at 6:30, 14:30 and 22:30 of every working day of the week.
You can find detailed documentation about the format of the Cron expressions here.
For simplicity, the Elapsed
event supports only one handler at a time. Subscribing twice with +=
without unsubscribing with -=
results in an exception.
Upvotes: 2
Reputation: 483
How about using a WaitHandle?
// set intial state to signaled so the first worker can enter
AutoResetEvent done = new AutoResetEvent(true);
public async Task DoWork(){
// Wait at most 1 ms to acquire the handle
if(!done.WaitOne(1)) return;
// do some stuff here
// Release handle to other threads
done.Set();
}
This guarantees only one thread will be doing the work at a time. For more information on AutoResetEvent
Upvotes: 1
Reputation: 919
Curious about thoughts on this approach.
Worker class:
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
//If(NOT SCHEDULED TIME RETURN) //QUARTZ JOB SCHEDULING HERE???
_tasksManager.LongProccess1Async(stoppingToken);
_tasksManager.LongProccess2Async(stoppingToken);
await Task.Delay(_intDelay, stoppingToken); //1 * 1000 = 1 second
}
}
Task Manager Class:
private Task<long>? _taskLongProccess1;
//private long _taskParseDelimitedFiles;
public async Task longProccess1Async(CancellationToken stoppingToken)
{
var pc = _parserConfigs.Find(p => p.Name == "longProccess1");
while (!stoppingToken.IsCancellationRequested)
{
if (_taskLongProccess1 == null)
_taskLongProccess1 = Task.Run(() => _delimitedParser.LongProccess1(pc.FilePath, pc.Delimiter, pc.ConnectionString, pc.Schema, pc.BulkSize));
if (_taskLongProccess1.Status == TaskStatus.Running)
{
await Task.Delay(pc.Delay, stoppingToken);
}
else if (_taskLongProccess1.Status == TaskStatus.RanToCompletion)
{
//ONCE DONE LOG AND NULL TASK
LoggingFunctions.addToLog(_logger, $"Total execution time for task:LongProccess1 = {_taskLongProccess1}", LoggingHelpers.InformationCode);
_taskLongProccess1 = null;
}
}
}
private Task<long>? _taskLongProccess2;
//private long _taskParseDelimitedFiles;
public async Task longProccess2Async(CancellationToken stoppingToken)
{
var pc = _parserConfigs.Find(p => p.Name == "longProccess2");
while (!stoppingToken.IsCancellationRequested)
{
if (_taskLongProccess2 == null)
_taskLongProccess2 = Task.Run(() => _delimitedParser.LongProccess2(pc.FilePath, pc.Delimiter, pc.ConnectionString, pc.Schema, pc.BulkSize));
if (_taskLongProccess2.Status == TaskStatus.Running)
{
await Task.Delay(pc.Delay, stoppingToken);
}
else if (_taskLongProccess2.Status == TaskStatus.RanToCompletion)
{
//ONCE DONE LOG AND NULL TASK
LoggingFunctions.addToLog(_logger, $"Total execution time for task:LongProccess1 = {_taskLongProccess2}", LoggingHelpers.InformationCode);
_taskLongProccess2 = null;
}
}
}
Upvotes: -1
Reputation: 21062
While I use "await/async" a lot it does not mean I use them always. In such case I would use timers (with single trigger, non-repeating), and at the end of each function (or wrapper for it) I would set timer again. This guarantees that execution will not overlap.
Upvotes: 0