My2ndLovE
My2ndLovE

Reputation: 397

Creating a Thread Queue

I need to handle user request one by one(similar like a queue job)

This is what i have:

Thread checkQueue;
Boolean IsComplete = true;

protected void Start()
{
    checkQueue = new Thread(CheckQueueState);
    checkQueue.Start();    
}

private void CheckQueueState()
    {
        while (true)
        {
            if (checkIsComplete)
            {
                ContinueDoSomething();

                checkQueue.Abort();
                break;
            }
            System.Threading.Thread.Sleep(1000);
        }
    }

protected void ContinueDoSomething()
{
    IsComplete = false;
    ...
    ...
    IsComplete = true; //when done, set it to true
}

Everytime when there is a new request from user, system will call Start() function and check whether the previous job is complete, if yes then will proceed to next job.

But I am not sure whether it is correct to do in this way.

Any improvement or any better way to do this?

Upvotes: 1

Views: 377

Answers (3)

Enigmativity
Enigmativity

Reputation: 117175

Take a look at the Reactive Extensions from Microsoft. This library contains a set of schedulers available that follow the semantics you require.

The best to fit your needs is the EventLoopScheduler. It will queue up actions and perform them one after another. If it completes an action and there are more items in the queue it will sequentially process the actions on the same thread until the queue is empty and then it disposes the thread. When a new action is queued it creates a new thread. It is very efficient because of this.

The code is super simple and looks like this:

var scheduler = new System.Reactive.Concurrency.EventLoopScheduler();

scheduler.Schedule(() => { /* action here */ });

If you need to have every queued action performed on a new thread then use it like this:

var scheduler = new System.Reactive.Concurrency.NewThreadScheduler();

scheduler.Schedule(() => { /* action here */ });

Very simple.

Upvotes: 0

Kirill Shlenskiy
Kirill Shlenskiy

Reputation: 9587

I like usr's suggestion regarding using TPL Dataflow. If you have the ability to add external dependencies to your project (TPL Dataflow is not distributed as part of the .NET framework), then it provides a clean solution to your problem.

If, however, you're stuck with what the framework has to offer, you should have a look at BlockingCollection<T>, which works nicely with the producer-consumer pattern that you're trying to implement.

I've thrown together a quick .NET 4.0 example to illustrate how it can be used in your scenario. It is not very lean because it has a lot of calls to Console.WriteLine(). However, if you take out all the clutter it's extremely simple.

At the center of it is a BlockingCollection<Action>, which gets Action delegates added to it from any thread, and a thread specifically dedicated to dequeuing and executing those Actions sequentially in the exact order in which they were added.

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace SimpleProducerConsumer
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Main thread id is {0}.", Thread.CurrentThread.ManagedThreadId);

            using (var blockingCollection = new BlockingCollection<Action>())
            {
                // Start our processing loop.
                var actionLoop = new Thread(() =>
                {
                    Console.WriteLine(
                        "Starting action loop on thread {0} (dedicated action loop thread).",
                        Thread.CurrentThread.ManagedThreadId,
                        Thread.CurrentThread.IsThreadPoolThread);

                    // Dequeue actions as they become available.
                    foreach (var action in blockingCollection.GetConsumingEnumerable())
                    {
                        // Invoke the action synchronously
                        // on the "actionLoop" thread.
                        action();
                    }

                    Console.WriteLine("Action loop terminating.");
                });

                actionLoop.Start();

                // Enqueue some work.
                Console.WriteLine("Enqueueing action 1 from thread {0} (main thread).", Thread.CurrentThread.ManagedThreadId);
                blockingCollection.Add(() => SimulateWork(1));

                Console.WriteLine("Enqueueing action 2 from thread {0} (main thread).", Thread.CurrentThread.ManagedThreadId);
                blockingCollection.Add(() => SimulateWork(2));

                // Let's enqueue it from another thread just for fun.
                var enqueueTask = Task.Factory.StartNew(() =>
                {
                    Console.WriteLine(
                        "Enqueueing action 3 from thread {0} (task executing on a thread pool thread).",
                        Thread.CurrentThread.ManagedThreadId);

                    blockingCollection.Add(() => SimulateWork(3));
                });

                // We have to wait for the task to complete
                // because otherwise we'll end up calling
                // CompleteAdding before our background task
                // has had the chance to enqueue action #3.
                enqueueTask.Wait();

                // Tell our loop (and, consequently, the "actionLoop" thread)
                // to terminate when it's done processing pending actions.
                blockingCollection.CompleteAdding();

                Console.WriteLine("Done enqueueing work. Waiting for the loop to complete.");

                // Block until the "actionLoop" thread terminates.
                actionLoop.Join();

                Console.WriteLine("Done. Press Enter to quit.");
                Console.ReadLine();
            }
        }

        private static void SimulateWork(int actionNo)
        {
            Thread.Sleep(500);
            Console.WriteLine("Finished processing action {0} on thread {1} (dedicated action loop thread).", actionNo, Thread.CurrentThread.ManagedThreadId);
        }
    }
}

And the output is:

0.016s: Main thread id is 10.
0.025s: Enqueueing action 1 from thread 10 (main thread).
0.026s: Enqueueing action 2 from thread 10 (main thread).
0.027s: Starting action loop on thread 11 (dedicated action loop thread).
0.028s: Enqueueing action 3 from thread 6 (task executing on a thread pool thread).
0.028s: Done enqueueing work. Waiting for the loop to complete.
0.527s: Finished processing action 1 on thread 11 (dedicated action loop thread).
1.028s: Finished processing action 2 on thread 11 (dedicated action loop thread).
1.529s: Finished processing action 3 on thread 11 (dedicated action loop thread).
1.530s: Action loop terminating.
1.532s: Done. Press Enter to quit.

Upvotes: 1

usr
usr

Reputation: 171246

Use an ActionBlock<T> from the TPL Dataflow library. Set its MaxDegreeOfParalellism to 1 and you're done.

Note, that ASP.NET worker processes can recycle at any time (e.g. due to scheduled recycle, memory limit, server reboot or deployment), so the queued work might suddenly be lost without notice. I recommend you look into some external queueing solution like MSMQ (or others) for reliable queues.

Upvotes: 0

Related Questions