Jaysmito Mukherjee
Jaysmito Mukherjee

Reputation: 1526

What could be a better for condition_variables

I am trying to make a multi threaded function it looks like:

namespace {  // Anonymous namespace instead of static functions.

std::mutex log_mutex;

void Background() {
    while(IsAlive){
        std::queue<std::string> log_records;
        {
            // Exchange data for minimizing lock time.
            std::unique_lock lock(log_mutex);
            logs.swap(log_records);
        }
        if (log_records.empty()) {
            Sleep(200);
            continue;
        }
        while(!log_records.empty()){
            ShowLog(log_records.front()); 
            log_records.pop();
        }
    }
}

void Log(std::string log){
    std::unique_lock lock(log_mutex);
    logs.push(std::move(log));
}

}

I use Sleep to prevent high CPU usages due to continuously looping even if logs are empty. But this has a very visible draw back that it will print the logs in batches. I tried to get over this problem by using conditional variables but in there the problem is if there are too many logs in a short time then the cv is stopped and waked up many times leading to even more CPU usage. Now what can i do to solve this issue? You can assume there may be many calls to log per second.

Upvotes: 0

Views: 67

Answers (1)

rturrado
rturrado

Reputation: 8074

I would probably think of using a counting semaphore for this:

  • The semaphore would keep a count of the number of messages in the logs (initially zero).
  • Log clients would write a message and increment by one the number of messages by releasing the semaphore.
  • A log server would do an acquire on the semaphore, blocking until there was any message in the logs, and then decrementing by one the number of messages.

Notice:

  • Log clients get the logs queue lock, push a message, and only then do the release on the semaphore.
  • The log server can do the acquire before getting the logs queue lock; this would be possible even if there were more readers. For instance: 1 message in the log queue, server 1 does an acquire, server 2 does an acquire and blocks because semaphore count is 0, server 1 goes on and gets the logs queue lock...
#include <algorithm>  // for_each
#include <chrono>  // chrono_literasl
#include <future>  // async, future
#include <iostream>  // cout
#include <mutex>  // mutex, unique_lock
#include <queue>
#include <semaphore>  // counting_semaphore
#include <string>
#include <thread>  // sleep_for
#include <vector>

std::mutex mtx{};
std::queue<std::string> logs{};
std::counting_semaphore c_semaphore{ 0 };

int main()
{
    auto log = [](std::string message) {
        std::unique_lock lock{ mtx };
        logs.push(std::move(message));
        c_semaphore.release();
    };
    auto log_client = [&log]() {
        using namespace std::chrono_literals;
        static size_t s_id{ 1 };
        size_t id{ s_id++ };
        for (;;)
        {
            log(std::to_string(id));
            std::this_thread::sleep_for(id * 100ms);
        }
    };
    auto log_server = []() {
        for (;;)
        {
            c_semaphore.acquire();
            std::unique_lock lock{ mtx };
            std::cout << logs.front() << " ";
            logs.pop();
        }
    };

    std::vector<std::future<void>> log_clients(10);
    std::for_each(std::begin(log_clients), std::end(log_clients),
        [&log_client](auto& lc_fut) {
            lc_fut = std::async(std::launch::async, log_client);
        });
    auto ls_fut{ std::async(std::launch::async, log_server) };

    std::for_each(std::begin(log_clients), std::end(log_clients),
        [](auto& lc_fut) { lc_fut.wait(); });
    ls_fut.wait();
}

Upvotes: 1

Related Questions