codereviewanskquestions
codereviewanskquestions

Reputation: 13998

pthread pool, C++

I am working on a networking program using C++ and I'd like to implement a pthread pool. Whenever, I receive an event from the receive socket, I will put the data into the queue in the thread pool. I am thinking about creating 5 separate threads and will consistently check the queue to see if there is anything incoming data to be done.

This is quite straight forward topic but I am not a expert so I would like to hear anything that might help to implement this.

Please let me know any tutorials or references or problems I should aware.

Upvotes: 2

Views: 2345

Answers (5)

Sam Miller
Sam Miller

Reputation: 24174

Use Boost.Asio and have each thread in the pool invoke io_service::run().

Multiple threads may call io_service::run() to set up a pool of threads from which completion handlers may be invoked. This approach may also be used with io_service::post() to use a means to perform any computational tasks across a thread pool.

Note that all threads that have joined an io_service's pool are considered equivalent, and the io_service may distribute work across them in an arbitrary fashion.

Upvotes: 4

Loki Astari
Loki Astari

Reputation: 264401

Before I start.
Use boost::threads

If you want to know how to do it with pthread's then you need to use the pthread condition variables. These allow you to suspend threads that are waiting for work without consuming CPU.

When an item of work is added to the queue you signal the condition variable and one pthread will be released from the condition variable thus allowing it to take an item from the queue. When the thread finishes processing the work item it returns back to the condition variable to await the next piece of work.

The main loop for the threads in the loop should look like this;

ThreadWorkLoop()                   // The function that all the pool threads run. 
{
    while(poolRunnin)
    {
        WorkItem = getWorkItem(); // Get an item from the queue. This suspends until an item 
        WorkItem->run();          // is available then you can run it.
    }
}
GetWorkItem()
{
    Locker  lock(mutex);                // RAII: Lock/unlock mutex
    while(workQueue.size() == 0)
    {
        conditionVariable.wait(mutex);  // Waiting on a condition variable suspends a thread
    }                                   // until the condition variable is signalled.
                                        // Note: the mutex is unlocked while the thread is suspended
    return workQueue.popItem();
}
AddItemToQueue(item)
{
    Locker lock(mutex);
    workQueue.pushItem(item);
    conditionVariable.signal();        // Release a thread from the condition variable.
}

Upvotes: 1

Michael Anderson
Michael Anderson

Reputation: 73480

Boost asio is a good solution.

But if you dont want to use it (or cant use it for whatever reasons) then you'll probably want to use a semaphore based implementation.

You can find a multithreaded queue implementation based on semaphores that I use here:

https://gist.github.com/482342

The reason for using semaphores is that you can avoid having the worker threads continually polling, and instead have them woken up by the OS when there is work to be done.

Upvotes: 0

MK.
MK.

Reputation: 34527

You will need a mutex and a conditional variable. Mutex will protect your job queue and when receiving threads add a job to the queue it will signal the condition variable. The worker threads will wait on the condition variable and will wake up when it is signaled.

Upvotes: 0

Vinicius Kamakura
Vinicius Kamakura

Reputation: 7778

Have the receive thread to push the data on the queue and the 5 threads popping it. Protect the queue with a mutex and let them "fight" for the data.

You also want to have a usleep() or pthread_yield() in the worker thread's main loop

Upvotes: 0

Related Questions