Antonio Santoro
Antonio Santoro

Reputation: 917

Producer-consumer scenario causing std::system_error on unique_lock

The following program should create a JobScheduler that puts Jobs into a running_queue from a waiting_queue. When the running_queue is not empty, 2 or more threads execute the Job going to sleep for a time specified into it, if the Job is completed (duration == execution_time) then it is put into the completed_queue else it is put into the running_queue again, till both running_queue and waiting_queue are empty.

The runningLock.unlock() call will at some point cause a

terminate called after throwing an instance of 'std::system_error'

Isn't the runningLock acquired only from one thread at once? It looks like .unlock() has been called even if the thread did not acquire the lock.

main.cpp

#include <string>
#include <iostream>
#include <sstream>
#include <vector>
#include <queue>
#include <thread>
#include <future>
// #include "JobScheduler.h"

class Job {

private:
    int id;
    int duration;
    int execution_time;
    int start_time;
    int wait_time;
    float completion_time;

public:
    Job();

    Job(int id, int startTime, int duration);

    Job(const Job &);

    Job& operator=(const Job &);

    ~Job();

    int getId() const;

    void setId(int id);

    int getDuration() const;

    void setDuration(int duration);

    int getExecutionTime() const;

    void setExecutionTime(int executionTime);

    int getStartTime() const;

    void setStartTime(int startTime);

    int getWaitTime() const;

    void setWaitTime(int waitTime);

    int getCompletionTime() const;

    void setCompletionTime(int completionTime);

    bool operator< (const Job&) const;
};

int Job::getId() const {
    return id;
}

void Job::setId(int id) {
    Job::id = id;
}

int Job::getDuration() const {
    return duration;
}

void Job::setDuration(int duration) {
    Job::duration = duration;
}

int Job::getExecutionTime() const {
    return execution_time;
}

void Job::setExecutionTime(int executionTime) {
    execution_time = executionTime;
}

int Job::getStartTime() const {
    return start_time;
}

void Job::setStartTime(int startTime) {
    start_time = startTime;
}

int Job::getWaitTime() const {
    return wait_time;
}

void Job::setWaitTime(int waitTime) {
    wait_time = waitTime;
}

int Job::getCompletionTime() const {
    return completion_time;
}

void Job::setCompletionTime(int completionTime) {
    completion_time = completionTime;
}

Job::Job() : id(0), start_time(0), duration(0) {}

Job::Job(int id, int startTime, int duration) : id(id), start_time(startTime), duration(duration), execution_time(0), wait_time(0), completion_time(0) {}

Job::Job(const Job &job) {
    id = job.id;
    duration = job.duration;
    execution_time = job.execution_time;
    start_time = job.start_time;
    wait_time = job.wait_time;
    completion_time = job.completion_time;
}

Job &Job::operator=(const Job &job) {
    if (this != &job) {
        id = job.id;
        duration = job.duration;
        execution_time = job.execution_time;
        start_time = job.start_time;
        wait_time = job.wait_time;
        completion_time = job.completion_time;
    }

    return *this;
}

bool Job::operator<(const Job &job) const {
    return start_time > job.start_time;
}

Job::~Job() {}

class JobScheduler {
private:
    const int quantum; // ms

    std::vector<std::thread> threadPool;

    std::priority_queue<Job> waitingJobsQueue;

    std::queue<Job> runningJobsQueue;
    std::mutex r_mutex;

    std::vector<Job> completedJobsQueue;
    std::mutex c_mutex;

    std::condition_variable is_empty;
    std::condition_variable is_completed;

    bool done;
public:
    JobScheduler();

    void submit(Job j);
    void start();

    ~JobScheduler();
};

JobScheduler::JobScheduler() : quantum(3000), waitingJobsQueue(), runningJobsQueue(), completedJobsQueue(), done(false) {}

void JobScheduler::submit(Job j) {
    waitingJobsQueue.push(j);
}

void JobScheduler::start() {

    auto startTime = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now());

    for (int i = 0; i < std::thread::hardware_concurrency(); i++) {
        threadPool.emplace_back([this, startTime](){
            std::stringstream msg;
            msg << "thread " << std::this_thread::get_id() << " created" << std::endl;
            std::cout << msg.str();

            std::unique_lock runningLock(r_mutex);
            while (!done) {
                auto woke_up_time = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now());
                is_empty.wait(runningLock, [this]{return (!runningJobsQueue.empty() || waitingJobsQueue.empty());});
                auto elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - woke_up_time);
                int waiting_time = elapsed_time.count();

                if (!runningJobsQueue.empty()) {
                    Job job = runningJobsQueue.front();
                    runningJobsQueue.pop();
                    runningLock.unlock();

                    job.setWaitTime(job.getWaitTime() + waiting_time);

                    float sleep_time = std::min(job.getDuration() - job.getExecutionTime(), quantum);
                    job.setExecutionTime(job.getExecutionTime() + sleep_time);
                    std::stringstream msg1;
                    msg1 << "[" << std::this_thread::get_id() << "] running job " << job.getId() << " at t = " << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - startTime).count() << std::endl;
                    std::cout << msg1.str();
                    std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<long>(sleep_time)));

                    if (job.getExecutionTime() >= job.getDuration()) {
                        auto completion_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - startTime);
                        job.setCompletionTime(completion_time.count());
                        std::lock_guard completedLock(c_mutex);
                        completedJobsQueue.push_back(job);
                        std::stringstream msg2;
                        msg2 << "[" << std::this_thread::get_id() << "] completed " << job.getId() << " at t = " << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - startTime).count() << std::endl;
                        std::cout << msg2.str();
                    } else {
                        runningLock.lock();
                        runningJobsQueue.push(job);
                        std::stringstream msg2;
                        msg2 << "[" << std::this_thread::get_id() << "] pushed " << job.getId() << " at t = " << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - startTime).count() << std::endl;
                        std::cout << msg2.str();
                        is_empty.notify_one();
                    }

                } else {
                    runningLock.lock();
                    done = true;
                    is_empty.notify_all();
                }


            }

            std::stringstream msg3;
            msg3 << "[" << std::this_thread::get_id() << "] finished at t = " << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - startTime).count() << std::endl;
            std::cout << msg3.str();


        });
    }

    while (!waitingJobsQueue.empty()) {

        Job job = waitingJobsQueue.top();
        float start_time = std::chrono::milliseconds(job.getStartTime()).count();
        std::stringstream msg;
        msg << "[JobScheduler] sleeping " << start_time << " ms to start job " << job.getId() << std::endl;
        std::cout << msg.str();


        auto elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - startTime);


        if (elapsed_time.count() < job.getStartTime()) {
            std::this_thread::sleep_for(std::chrono::milliseconds(job.getStartTime()-elapsed_time.count()));
        }


        {
            std::lock_guard lockGuard(r_mutex);
            waitingJobsQueue.pop();
            runningJobsQueue.push(job);
            std::stringstream msg;
            msg << "[JobScheduler] job " << job.getId() << " pushed into the running queue" << std::endl;
            std::cout << msg.str();
            is_empty.notify_all();
        }

    }
}

JobScheduler::~JobScheduler() {
    for (int i = 0; i < std::thread::hardware_concurrency(); i++) {
        if (threadPool[i].joinable()) {
            threadPool[i].join();
        }
    }

    float completed_queue_size = completedJobsQueue.size();
    float max_compl_time = 0;
    float tot_exec_time = 0;
    float tot_turnaround_time = 0;
    float tot_waiting_time = 0;
    for (auto &j: completedJobsQueue) {
        j = completedJobsQueue.back();
        completedJobsQueue.pop_back();
        max_compl_time = std::max(static_cast<float>(j.getCompletionTime()), max_compl_time);
        tot_exec_time += j.getExecutionTime();
        tot_waiting_time += j.getWaitTime();
        tot_turnaround_time += j.getCompletionTime() - j.getStartTime();
    }

    float avg_turnaround_time = (tot_turnaround_time/completed_queue_size)/1000;
    float avg_waiting_time = (tot_waiting_time/completed_queue_size)/1000;
    float exec_time = tot_exec_time/(1000*completed_queue_size);
    float compl_time = max_compl_time/1000;
    float cpu_usage = (exec_time/compl_time)*100;

    std::cout.precision(3);
    std::cout << "avg turnaround time " << avg_turnaround_time << "s" << std::endl;
    std::cout << "avg waiting time " << avg_waiting_time << "s" << std::endl;
    std::cout << "exec time " << exec_time << "s" << std::endl;
    std::cout << "compl time " << compl_time << "s" << std::endl;
    std::cout << "cpu usage " << cpu_usage << "%" << std::endl;
}

int main() {
    JobScheduler p{};
    p.submit(Job(1, 0, 15000));
    p.submit(Job(2, 0, 6000));
    p.submit(Job(3, 1000, 9000));
    p.submit(Job(4, 2000, 12000));
    p.submit(Job(5, 3000, 16000));
    p.submit(Job(6, 3000, 5000));
    p.submit(Job(7, 4000, 7000));
    p.submit(Job(8, 4000, 6000));
    p.submit(Job(9, 5000, 9000));

    p.start();
}

UPDATE: I can't reduce the above code. Here is my full output

thread 4 created
thread 3 created
thread 2 created
[JobScheduler] sleeping 0 ms to start job 1
[JobScheduler] job 1 pushed into the running queue
thread 5 created
[JobScheduler] sleeping 0 ms to start job 2
[JobScheduler] job 2 pushed into the running queue
[2] running job 1 at t = 1
[JobScheduler] sleeping 1000 ms to start job 3
[5] running job 2 at t = 2
[JobScheduler] job 3 pushed into the running queue
[JobScheduler] sleeping 2000 ms to start job 4
[4] running job 3 at t = 1000
[JobScheduler] job 4 pushed into the running queue
[3] running job 4 at t = 2001
[JobScheduler] sleeping 3000 ms to start job 6
[JobScheduler] job 6 pushed into the running queue
[JobScheduler] sleeping 3000 ms to start job 5
[JobScheduler] job 5 pushed into the running queue
[JobScheduler] sleeping 4000 ms to start job 8
[2] pushed 1 at t = 3003
[2] running job 6 at t = 3003
[5] pushed 2 at t = 3004
[5] running job 5 at t = 3004
[JobScheduler] job 8 pushed into the running queue
[JobScheduler] sleeping 4000 ms to start job 7
[JobScheduler] job 7 pushed into the running queue
[JobScheduler] sleeping 5000 ms to start job 9
[4] pushed 3 at t = 4001
[4] running job 1 at t = 4001
[JobScheduler] job 9 pushed into the running queue
[3] pushed 4 at t = 5002
[3] running job 2 at t = 5002
[2] pushed 6 at t = 6004
[2] running job 8 at t = 6004
[5] pushed 5 at t = 6005
[5] running job 7 at t = 6005
[4] pushed 1 at t = 7002
[4] running job 3 at t = 7002
[3] completed 2 at t = 8002
terminate called after throwing an instance of 'std::system_error'
  what():  Operation not permitted
[5] pushed 7 at t = 9366
[5] running job 4 at t = 9366
[2] pushed 8 at t = 9366
[2] running job 6 at t = 9366

Process finished with exit code 3

Upvotes: 0

Views: 169

Answers (1)

Murphy
Murphy

Reputation: 3999

From looking at the code, not testing it, I can point to the following bug in the logic:

You are calling runningLock.unlock() within the loop while (!done) and under the condition if (!runningJobsQueue.empty()), but done will only be set to true in the else case of that latter condition. So I think you're trying to unlock the same mutex multiple times, which causes the error:

If there is no associated mutex or the mutex is not locked, std::system_error with an error code of std::errc::operation_not_permitted

https://en.cppreference.com/w/cpp/thread/unique_lock/unlock

Upvotes: 3

Related Questions