Reputation: 3482
I'm creating a dispatcher class - which is itself a long running task which can be cancelled at anytime by the user. This Task will poll the database to see if there is any work that needs to be done, and run up to X [5] # of child tasks.
As far as I can tell - it's working great, but I have a few questions/concerns about the code. More or less -- since I couldn't find another example of this -- am I doing it right ? Are there things that I could improve ?
Q: Is this the best way to do this ? In the StartDownloadProcess (this is the child task) I'm creating the CancellationTokenSource and adding it to the dictionary, then starting the task. I've added a Continuation to it which then removes the item from the Dictionary when processing is completed so that it doesn't get called in the Cancel method.
In the child task I'm passing the Cancellation Token to the method that actually does the work. That process will then check to see if it needs to abort by checking that token periodically. Is this correct ?
In the Cancel method - I'm creating a copy of the Keys in the Dictionary, iterating over it and trying to access and remove the item from the dictionary and issuing a Cancel request.
Q: Is this the best way to do this ? Do I need to wait to see if the task actually cancelled ? Can I ?
Q: Should I be disposing of the CTS ?
Note: In the StartDownloadProcess I'm using a while(true) to loop until the task completes, or is cancelled to iterate until j > requestKey. In the real code there would be no while loop. It would simply start the new task and run the actual download process.
--
/// <summary>
/// Primary dispatcher token source
/// </summary>
CancellationTokenSource primaryTokenSource;
/// <summary>
/// A collection of Worker Tokens which can be used to cancel worker tasks and keep track of how many
/// there are.
/// </summary>
ConcurrentDictionary<int, CancellationTokenSource> workerTokens = new ConcurrentDictionary<int, CancellationTokenSource>();
/// <summary>
/// Runs this instance.
/// </summary>
public void Run() {
// Only one dispatcher can be running
if (IsRunning)
return;
// Create a new token source
primaryTokenSource = new CancellationTokenSource();
// Create the cancellation token to pass into the Task
CancellationToken token = primaryTokenSource.Token;
// Set flag on
IsRunning = true;
// Fire off the dispatcher
Task.Factory.StartNew(
() => {
// Loop forever
while (true) {
// If there are more than 5 threads running, don't add a new one
if (workerTokens.Count < 5) {
// Check to see if we've been cancelled
if (token.IsCancellationRequested)
return;
// Check to see if there are pending requests
int? requestKey = null;
// Query database (removed)
requestKey = new Random().Next(1550);
// If we got a request, start processing it
if (requestKey != null) {
// Check to see if we've been cancelled before running the child task
if (token.IsCancellationRequested)
return;
// Start the child downloader task
StartDownloadProcess(requestKey.Value);
}
} else {
// Do nothing, we've exceeded our max tasks
Console.WriteLine("MAX TASKS RUNNING, NOT STARTING NEW");
}
// Sleep for the alloted time
Thread.Sleep(Properties.Settings.Default.PollingInterval);
}
}, token)
// Turn running flag off
.ContinueWith((t) => IsRunning = false)
// Notify that we've finished
.ContinueWith(OnDispatcherStopped);
}
/// <summary>
/// Starts the download process.
/// </summary>
/// <param name="requestKey">The request key.</param>
private void StartDownloadProcess(int requestKey) {
CancellationTokenSource workerTokenSource = new CancellationTokenSource();
CancellationToken token = workerTokenSource.Token;
// Add the token source to the queue
workerTokens.GetOrAdd(requestKey, workerTokenSource);
// Start the child downloader task
Task.Factory.StartNew(
() => {
int j = 0;
while (true) {
if (token.IsCancellationRequested) {
Console.WriteLine("Sub-Task Cancelled {0}", requestKey);
return;
}
// Create a new downloader, pass it the RequestKey and token
//var downloader = new Downloader(requestKey, token);
//downloader.Run();
// Simulate work
Thread.Sleep(250);
Console.WriteLine("SUB-Task {0} is RUNNING! - #{1}", requestKey, j);
// Simulate - automatically end task when j > requestkey
if (j++ > requestKey) {
Console.WriteLine("SUB TASK {0} IS ENDING!", requestKey);
return;
}
}
},
token
).ContinueWith((t) => {
// If we ended naturally, the cancellationtoken will need to be removed from the dictionary
CancellationTokenSource source = null;
workerTokens.TryRemove(requestKey, out source);
});
}
/// <summary>
/// Cancels this instance.
/// </summary>
public void Cancel() {
// Cancel the primary task first so new new child tasks are created
if (primaryTokenSource != null)
primaryTokenSource.Cancel();
// Iterate over running cancellation sources and terminate them
foreach (var item in workerTokens.Keys.ToList()) {
CancellationTokenSource source = null;
if (workerTokens.TryRemove(item, out source)) {
source.Cancel();
}
}
}
Additionally, not shown in the example above.. several events are also able to be raised using within the tasks... those events all look like the following:
public event EventHandler DispatcherStarted;
private void OnDispatcherStarted() {
EventHandler handler = DispatcherStarted;
if (handler != null)
Task.Factory.StartNew(() => handler(this, EventArgs.Empty), CancellationToken.None, TaskCreationOptions.None, taskScheduler).Wait();
}
In the Run() method - at various points it would call OnDispatcher*(); to raise the events so the caller could subscribe and be notified. Those tasks that the event creates would run on the primary thread.
Upvotes: 1
Views: 2205
Reputation: 532
I quickly looked throw the code and have few comments:
Upvotes: 1