Reputation: 8180
My aim is to avoid using threadpool threads for CPU bound work, thus avoiding a situation where IIS stops responding to new requests.
Can you see any problems with the code below? Is this a safe/clean approach? Can you offer any improvements?
private static ConcurrentQueue<Job> Jobs = new ConcurrentQueue<Job>();
static int threadCount = 0;
private void QueueJob(Job job)
{
lock(Jobs)
{
Jobs.Enqueue(job);
if (threadCount == 0)
{
Interlocked.Increment(ref threadCount);
var t= new Thread(new ThreadStart(ConsumeQueue));
t.Start();
}
}
}
private void ConsumeQueue()
{
while (true)
{
lock (Jobs)
{
if (!Jobs.Any())
{
Interlocked.Decrement(ref threadCount);
return;
}
}
Job j;
var jobToDo = Jobs.TryDequeue(out j);
if (jobToDo)
{
DoCPUBoundWork(j);
}
}
}
Upvotes: 1
Views: 948
Reputation: 551
There is posibility that your thread terminates before Enqueue job
lock (Jobs)
{
if (!ResizeJobs.Any())
{
Interlocked.Decrement(ref threadCount);
return;
}
}
And after this another job will execute Jobs.Enqueue(job);
I think you don't need to terminate worker thread. It should wait for work in sleep state
Upvotes: 1
Reputation: 120538
Here's a basic Queue that should satisfy your needs:
//sealed so we don't have to implement full IDisposable pattern
sealed class Q:IDisposable
{
private CancellationTokenSource cts = new CancellationTokenSource();
private BlockingCollection<Action> queue =
new BlockingCollection<Action>(new ConcurrentQueue<Action>());
public Q()
{
new Thread(() => RunQueue()).Start();
}
private void RunQueue()
{
while(!cts.IsCancellationRequested)
{
Action action;
try
{
//lovely... blocks until something is available
//so we don't need to deal with messy synchronization
action = queue.Take(cts.Token);
}
catch(OperationCanceledException)
{
break;
}
action();
}
}
public void AddJob(Action action)
{
try
{
queue.Add(action,cts.Token);
}
catch(OperationCanceledException e)
{
throw new ObjectDisposedException("Q is disposed",e);
}
}
public void Dispose()
{
if(!cts.IsCancellationRequested)
{
cts.Cancel();
}
}
}
To be used as follows:
Q actionQueue=new Q();
actionQueue.AddJob(() => Console.WriteLine("action1"));
actionQueue.AddJob(() => Console.WriteLine("action2"));
actionQueue.AddJob(() => Console.WriteLine("action3"));
Upvotes: 2