GoekhanDev
GoekhanDev

Reputation: 518

Threading queue in c++

Currently working on a project, im struggeling with threading and queue at the moment, the issue is that all threads take the same item in the queue.

Reproduceable example:

#include <iostream>
#include <queue>
#include <thread>

using namespace std;

void Test(queue<string> queue){
    
    while (!queue.empty()) {
    
        string proxy = queue.front();
        cout << proxy << "\n";
        
        queue.pop();
    
    }
    
}

int main()
{
    
    queue<string> queue;
    
    queue.push("101.132.186.39:9090");
    queue.push("95.85.24.83:8118");
    queue.push("185.211.193.162:8080");
    queue.push("87.106.37.89:8888");
    queue.push("159.203.61.169:8080");
    
    std::vector<std::thread> ThreadVector;
    
    
    for (int i = 0; i <= 10; i++){
        ThreadVector.emplace_back([&]() {Test(queue); });
    }
    
    for (auto& t : ThreadVector){
        t.join();
    }

    ThreadVector.clear();

    return 0;
}

Upvotes: 0

Views: 2580

Answers (2)

Ruks
Ruks

Reputation: 3956

Look at this snippet:

void Test(std::queue<std::string> queue) { /* ... */ }

Here you pass a copy of the queue object to the thread.

This copy is local to each thread, so it gets destroyed after every thread exits so in the end your program does not have any effect on the actual queue object that resides in the main() function.

To fix this, you need to either make the parameter take a reference or a pointer:

void Test(std::queue<std::string>& queue) { /* ... */ }

This makes the parameter directly refer to the queue object present inside main() instead of creating a copy.

Now, the above code is still not correct since queue is prone to data-race and neither std::queue nor std::cout is thread-safe and can get interrupted by another thread while currently being accessed by one. To prevent this, use a std::mutex:

// ...
#include <mutex>

// ...

// The mutex protects the 'queue' object from being subjected to data-race amongst different threads
// Additionally 'io_mut' is used to protect the streaming operations done with 'std::cout'
std::mutex mut, io_mut;

void Test(std::queue<std::string>& queue) {
    std::queue<std::string> tmp;
    {
        // Swap the actual object with a local temporary object while being protected by the mutex
        std::lock_guard<std::mutex> lock(mut);
        std::swap(tmp, queue);
    }
    while (!tmp.empty()) {
        std::string proxy = tmp.front();
        {
            // Call to 'std::cout' needs to be synchronized
            std::lock_guard<std::mutex> lock(io_mut);
            std::cout << proxy << "\n";
        }
        tmp.pop();
    }
}

This synchronizes each thread call and prevents access from any other threads while queue is still being accessed by a thread.

Edit:

Alternatively, it'd be much faster in my opinion to make each thread wait until one of them receives a notification of your push to std::queue. You can do this through the use of std::condition_variable:

// ...

#include <mutex>
#include <condition_variable>

// ...

std::mutex mut1, mut2;
std::condition_variable cond;

void Test(std::queue<std::string>& queue, std::chrono::milliseconds timeout = std::chrono::milliseconds{10}) {
    std::unique_lock<std::mutex> lock(mut1);
    // Wait until 'queue' is not empty...
    cond.wait(lock, [queue] { return queue.empty(); });
    while (!queue.empty()) {
        std::string proxy = std::move(queue.front());
        std::cout << proxy << "\n";
        queue.pop();
    }
}

// ...

int main() {
    std::queue<string> queue;
    
    std::vector<std::thread> ThreadVector;
    
    for (int i = 0; i <= 10; i++)
        ThreadVector.emplace_back([&]() { Test(queue); });
    
    // Notify the vectors of each 'push()' call to 'queue'
    {
        std::unique_lock<std::mutex> lock(mut2);
        queue.push("101.132.186.39:9090");
        cond.notify_one();
    }
    
    {
        std::unique_lock<std::mutex> lock(mut2);
        queue.push("95.85.24.83:8118");
        cond.notify_one();
    }
    
    {
        std::unique_lock<std::mutex> lock(mut2);
        queue.push("185.211.193.162:8080");
        cond.notify_one();
    }
    
    {
        std::unique_lock<std::mutex> lock(mut2);
        queue.push("87.106.37.89:8888");
        cond.notify_one();
    }
    
    {
        std::unique_lock<std::mutex> lock(mut2);
        queue.push("159.203.61.169:8080");
        cond.notify_one();
    }

    for (auto& t : ThreadVector)
        t.join();

    ThreadVector.clear();
}

Upvotes: 2

pattakosn
pattakosn

Reputation: 395

You are giving each thread its own copy of the queue. I imagine that what you want is all the threads to work on the same queue and for that you will need to use some synchronization mechanism when multiple threads work on the shared queue as std queue is not thread safe.

edit: minor note: in your code you are spawning 11 threads not 10.

edit 2: OK, try this one to begin with:

std::mutex lock_work;
std::mutex lock_io;

void Test(queue<string>& queue){

while (!queue.empty()) {
    string proxy;
    {
        std::lock_guard<std::mutex> lock(lock_work);
        proxy = queue.front();
        queue.pop();
    }
    {
        std::lock_guard<std::mutex> lock(lock_io);
        cout << proxy << "\n";
    }
}   

}

Upvotes: 2

Related Questions