Reputation: 51
in my application I use different threads (one for every active transfer). I would for example want to limit the active tranfers, putting the other coming ones in a queue.
For now my solution was checking if there was space for another active transfer, if not I would put the request in a queue list. Everytime I've a completed event, I pick one from the queue. (but I guess that my solution is really dirty)
Is there a nice way to schedule the active threads?
Is using a semaphore a nice (and neat) choice?
Upvotes: 1
Views: 102
Reputation: 109567
You could use a BlockingCollection to manage the queue, for example:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
public class Program
{
private readonly BlockingCollection<int> _queue = new BlockingCollection<int>();
private void run()
{
const int CONSUMER_COUNT = 8;
Task[] tasks = new Task[CONSUMER_COUNT];
for (int i = 0; i < CONSUMER_COUNT; ++i)
{
int id = i;
tasks[i] = Task.Run(() => process(id));
}
Console.WriteLine("Press <return> to start adding to the queue.");
Console.ReadLine();
for (int i = 0; i < 100; ++i)
{
Console.WriteLine("Adding item #{0}", i);
_queue.Add(i);
}
Console.WriteLine("Press <return> to close the queue.");
Console.ReadLine();
_queue.CompleteAdding();
Console.WriteLine("Waiting for all tasks to exit.");
Task.WaitAll(tasks);
Console.WriteLine("Finished waiting for all tasks. Press <return> to exit.");
Console.ReadLine();
}
private void process(int id)
{
Console.WriteLine("Process {0} is starting.", id);
foreach (var item in _queue.GetConsumingEnumerable())
{
Console.WriteLine("Process {0} is processing item# {1}", id, item);
Thread.Sleep(200); // Simulate long processing time.
}
Console.WriteLine("Process {0} is stopping.", id);
}
private static void Main()
{
new Program().run();
}
}
}
Note that the crucial thing here is that GetConsumingEnumerable()
returns an enumerable which will block if there are no items in the queue, but which will exit if there no items and the producer has called CompleteAdding()
. This makes it quite easy to control when the worker threads exit.
Upvotes: 3