Tom
Tom

Reputation: 8180

Strategy for avoiding threadpool starvation while performing cpu bound jobs in a queued fashion

My aim is to avoid using threadpool threads for CPU bound work, thus avoiding a situation where IIS stops responding to new requests.

Can you see any problems with the code below? Is this a safe/clean approach? Can you offer any improvements?

    private static ConcurrentQueue<Job> Jobs = new ConcurrentQueue<Job>();
    static int threadCount = 0;

    private void QueueJob(Job job)
    {

        lock(Jobs)
        {
            Jobs.Enqueue(job);
            if (threadCount == 0)
            {

                Interlocked.Increment(ref threadCount); 
                var t= new Thread(new ThreadStart(ConsumeQueue));              
                t.Start();

            }
        }



    }
    private void ConsumeQueue()
    {
        while (true)
        {
            lock (Jobs)
            { 
                if (!Jobs.Any())
                {
                    Interlocked.Decrement(ref threadCount);
                    return;
                }
            }

            Job j;

            var jobToDo = Jobs.TryDequeue(out j);

            if (jobToDo)
            {
                DoCPUBoundWork(j);
            }
        }

    }

Upvotes: 1

Views: 948

Answers (2)

Andrew
Andrew

Reputation: 551

There is posibility that your thread terminates before Enqueue job

lock (Jobs)
{ 
     if (!ResizeJobs.Any())
     {
         Interlocked.Decrement(ref threadCount);
         return;
     }
}

And after this another job will execute Jobs.Enqueue(job);

I think you don't need to terminate worker thread. It should wait for work in sleep state

Upvotes: 1

spender
spender

Reputation: 120538

Here's a basic Queue that should satisfy your needs:

//sealed so we don't have to implement full IDisposable pattern
sealed class Q:IDisposable
{
    private CancellationTokenSource cts = new CancellationTokenSource();
    private BlockingCollection<Action> queue =
        new BlockingCollection<Action>(new ConcurrentQueue<Action>());

    public Q()
    {
        new Thread(() => RunQueue()).Start();
    }

    private void RunQueue()
    {
        while(!cts.IsCancellationRequested)
        {
            Action action;
            try
            {
                //lovely... blocks until something is available
                //so we don't need to deal with messy synchronization
                action = queue.Take(cts.Token); 
            }
            catch(OperationCanceledException)
            {
                break;
            }
            action();
        }
    }

    public void AddJob(Action action)
    {
        try
        {
            queue.Add(action,cts.Token);
        }
        catch(OperationCanceledException e)
        {
            throw new ObjectDisposedException("Q is disposed",e);
        }
    }

    public void Dispose()
    {
        if(!cts.IsCancellationRequested)
        {
            cts.Cancel();
        }
    }
}

To be used as follows:

Q actionQueue=new Q();
actionQueue.AddJob(() => Console.WriteLine("action1"));
actionQueue.AddJob(() => Console.WriteLine("action2"));
actionQueue.AddJob(() => Console.WriteLine("action3"));

Upvotes: 2

Related Questions