Kyro
Kyro

Reputation: 768

De-queue Items with worker threads

I have been trying to figure out how to solve an requirement I have but for the life of me I just can't come up with a solution.

I have a database of items which stores them a kind of queue. (The database has already been implemented and other processes will be adding items to this queue.)

The items require a lot of work/time to "process" so I need to be able to: Constantly de-queue items from the database. For each item run a new thread and process the item and then return true/false it it was successfully processed. (this will be used to re-add it to the database queue or not)

But to only do this while the current number of active threads (one per item being processed) is less then a maximum number of threads parameter.

Once the maximum number of threads has been reached I need to stop de-queuing items from the database until the current number of threads is less than the maximum number of threads. At which point it needs to continue de-queuing items.

It feels like this should be something I can come up with but it is just not coming to me.

To clarify: I only need to implement the threading. The database has already be implemented.

Upvotes: 0

Views: 5459

Answers (3)

Jim Mischel
Jim Mischel

Reputation: 133975

One really easy way to do this is with a Semaphore. You have one thread that dequeues items and creates threads to process them. For example:

const int MaxThreads = 4;
Semaphore sem = new Semaphore(MaxThreads, MaxThreads);
while (Queue.HasItems())
{
    sem.WaitOne();
    var item = Queue.Dequeue();
    Threadpool.QueueUserWorkItem(ProcessItem, item); // see below
}
// When the queue is empty, you have to wait for all processing
// threads to complete.
// If you can acquire the semaphore MaxThreads times, all workers are done
int count = 0;
while (count < MaxThreads)
{
    sem.WaitOne();
    ++count;
}

// the code to process an item
void ProcessItem(object item)
{
    // cast the item to whatever type you need,
    // and process it.
    // when done processing, release the semaphore
    sem.Release();
}

The above technique works quite well. It's simple to code, easy to understand, and very effective.

One change is that you might want to use the Task API rather Threadpool.QueueUserWorkItem. Task gives you more control over the asynchronous processing, including cancellation. I used QueueUserWorkItem in my example because I'm more familiar with it. I would use Task in a production program.

Although this does use N+1 threads (where N is the number of items you want processed concurrently), that extra thread isn't often doing anything. The only time it's running is when it's assigning work to worker threads. Otherwise, it's doing a non-busy wait on the semaphore.

Upvotes: 6

bazza
bazza

Reputation: 8404

Ok, so the architecture of the solution is going to depend on one thing: does the processing time per queue item vary according to the item's data?

If not then you can have something that merely round-robins between the processing threads. This will be fairly simple to implement.

If the processing time does vary then you're going to need something with more of a 'next available' feel to it, so that whichever of you threads happens to be free first gets given the job of processing the data item.

Having worked that out you're then going to have the usual run around with how to synchronise between a queue reader and the processing threads. The difference between 'next-available' and 'round-robin' is how you do that synchronisation.

I'm not overly familiar with C#, but I've heard tell of a beast called a background worker. That is likely to be an acceptable means of bringing this about.

For round robin, just start up a background worker per queue item, storing the workers' references in an array. Limit yourself to, say, 16 in progress background workers. The idea is that having started 16 you would then wait for the first to complete before starting the 17th, and so on. I believe that background workers actually run as jobs on the thread pool, so that will automatically limit the number of threads that are actually running at any one time to something appropriate for the underlying hardware. To wait for a background worker see this. Having waited for a background worker to complete you'd then handle its result and start another up.

For the next available approach its not so different. Instead of waiting for the 1st to complete you would use WaitAny() to wait for any of the workers to complete. You handle the return from whichever one completed, and then start another one up and go back to WaitAny().

The general philosophy of both approaches is to keep a number of threads on the boil all the time. A features of the next-available approach is that the order in which you emit the results is not necessarily the same as the order of the input items. If that matters then the round robin approach with more background workers than CPU cores will be reasonably efficient (the threadpool will just start commissioned but not yet running workers anyway). However the latency will vary with the processing time.

BTW 16 is an arbitrary number chosen on the basis of how many cores you think will be on the PC running the software. More cores, bigger number.

Of course, in the seemingly restless and ever changing world of .NET there may now be a better way of doing this.

Good luck!

Upvotes: 0

joelt
joelt

Reputation: 2680

Do you just not know where to start?

Consider a thread pool with a max number of threads. http://msdn.microsoft.com/en-us/library/y5htx827.aspx

Consider spinning up your max number of threads immediately and monitoring the DB. http://msdn.microsoft.com/en-us/library/system.threading.threadpool.queueuserworkitem.aspx is convenient.

Remember that you can't guarantee your process will be ended safely...crashes happen. Consider logging of processing state.

Remember that your select and remove-from-queue operations should be atomic.

Upvotes: 0

Related Questions