Reputation: 247
I am trying to write a multithreaded logger and when I test to see if the queue of lines to write out is empty I get std::system_error with Invalid argument. This happens when constructing a unique_lock. If I pass std::try_to_lock there doesn't seem to be an issue, even though the conditions for being undefined seem to be the same for both constructors.
general.hh - nothing especially interesting here I think
#pragma once
#include <stdint.h>
#define likely(x) __builtin_expect (!!(x), 1)
#define unlikely(x) __builtin_expect (!!(x), 0)
namespace matan {
typedef int8_t s8;
typedef int16_t s16;
typedef int32_t s32;
typedef int64_t s64;
typedef uint8_t u8;
typedef uint16_t u16;
typedef uint32_t u32;
typedef uint64_t u64;
template <typename T, typename... Args>
inline void place(T* loc, Args&&... args) {
::new (loc) T(args...);
}
template <typename T, typename... Args>
inline void replace(T* p, Args&&... args) {
p->~T();
::new (p) T(args...);
}
} // matan
BunchQueue.hh - the basic data structure that seems to be having the issue.
#pragma once
#include <stdlib.h>
#include <string.h>
#include <mutex>
#include <vector>
#include <utility>
#include <type_traits>
#include <condition_variable>
#include <iostream> //PUSH_ASSERT
#include "general.hh"
namespace matan {
template <typename T>
class BaseQueue {
public:
typedef T value_type;
BaseQueue(size_t initCapacity) : m_capacity(initCapacity) {}
void reset() { m_size=0; }
size_t size() const { return m_size; }
T* begin() { return m_vec; }
const T* begin() const { return m_vec; }
T* end() { return m_vec+m_size; }
const T* end() const { return m_vec+m_size; }
protected:
T* m_vec = nullptr;
size_t m_capacity = 0;
size_t m_size = 0;
};
template <typename T>
class TakerQueue : public BaseQueue<T> {
/*
* A vector, but you have to use std::move
*/
public:
TakerQueue(size_t initCapacity = 1) : BaseQueue<T>(initCapacity) {
this->m_vec = (T*) malloc(sizeof(T)*(this->m_capacity));
}
void push_back(T& t) {
if (unlikely(this->m_size >= this->m_capacity)) {
this->m_capacity = this->m_capacity << 1;
T* oldVec = this->m_vec;
this->m_vec = (T*) malloc(sizeof(T)*(this->m_capacity));
for (size_t i = 0; i < this->m_size; i++) {
new (this->m_vec+i) T(std::move(oldVec[i]));
}
delete[] oldVec;
}
new (this->m_vec+this->m_size) T(std::move(t));
++(this->m_size);
}
};
template <typename T>
class ShallowQueue : public BaseQueue<T>{
//TODO: figure out the correct concept to use to guarantee T is trivially movable at compile time
/*
* A queue that instead of freeing and allocating memory constantly
* simply reuses the same memory overwriting the appropriate values.
*
* It's use case is to be filled, then iterated through, and then reset.
*
* Meant for usage with trivial classes, specifically structs as
* messages. The use of memcpy means I am not actually constructing
* an object in place, but just taking a shallow copy,
* and the use of realloc in vectors is only be valid for a trivially movable
* object.
*
*/
public:
ShallowQueue(size_t initCapacity = 1) : BaseQueue<T>(initCapacity) {
this->m_vec = (T*) malloc(sizeof(T)*(this->m_capacity));
}
void push_back(const T& msg) {
if (unlikely(this->m_size >= this->m_capacity)) {
this->m_capacity = this->m_capacity << 1;
this->m_vec = (T*) realloc(this->m_vec, sizeof(T)*this->m_capacity);
}
memcpy(this->m_vec+this->m_size, &msg, sizeof(T));
++(this->m_size);
}
};
template <typename Queue>
class BunchQueue {
/*
* Multi writer single reader.
*
* Instead of popping off individual messages the reader takes all of them
* at once. This works well if the architecture is focused on bunches.
* Also good memory wise because it means fewer allocations and frees and
* allows for adjacent memory access when reading through the messages.
* Drawback is that you have a relatively large memory footprint with 1
* vector just sitting around. Works best if you are not RAM bound and can
* expect fairly consistent bunch sizes.
*/
public:
BunchQueue(size_t initCapacity = 1) :
m_queueA(initCapacity), m_queueB(initCapacity) {
}
void push_back(const typename Queue::value_type& msg) {
std::unique_lock<std::mutex> lock(m_mtx);
auto& q = getQueue();
q.push_back(msg);
}
void push_back(typename Queue::value_type& msg) {
std::unique_lock<std::mutex> lock(m_mtx);
auto& q = getQueue();
q.push_back(msg);
}
const Queue& takeQueue() {
std::unique_lock<std::mutex> lock(m_mtx);
auto q = &(getQueue());
m_whichQueue = !m_whichQueue;
getQueue().reset();
return *q;
}
bool empty() {
try {
std::unique_lock<std::mutex> lock(m_mtx);
} catch (const std::system_error& e) {
std::cout << "error locking BunchQueue mutex: " << e.what() << std::endl;
throw e;
}
std::unique_lock<std::mutex> lock(m_mtx);
return m_queueA.size() == 0 && m_queueB.size() == 0;
}
private:
bool m_whichQueue = true;
std::mutex m_mtx;
Queue m_queueA;
Queue m_queueB;
Queue& getQueue() {
//Only for use in takeQueue, haven't considered general use for thread safety
return m_whichQueue ? m_queueA : m_queueB;
}
};
template <typename Msg>
using MessageQueue = BunchQueue<ShallowQueue<Msg>>;
} //namespace matan
AsyncWorker.hh - Abstract class which is responsible for organizing condition variables and locking for worker thread.
#pragma once
#include "BunchQueue.hh"
#include <thread>
#include <atomic>
namespace matan {
class AsyncWorker {
/*
* A class to allow for a process to contain a worker thread that follows
* a simple loop. The expected usage is for there to be some sort of queue
* like data structure that the owners write to, and the worker thread
* will run over each element performing some operation defined by doit.
*
* This class is capable of Multi writer, single reader (the internal thread).
* but the details of implementation will determine the reality of if you
* can take multiple writers.
*/
public:
AsyncWorker() : m_worker(new std::thread([this]() { this->workerLoop();})) {}
virtual ~AsyncWorker() = 0;
AsyncWorker(const AsyncWorker&) = delete;
protected:
void workerLoop() {
while (true) {
waitTillNeeded();
doit();
if (unlikely(m_bDone)) {
break;
}
}
doit();
}
/*
* doit is the function that we actual want the worker thread to perform.
* I assume that each doit call is enough to completely utilize all the
* contents on the worker threads "queue."
*/
virtual void doit() = 0;
/*
* Checks if there is any work for the worker thread to do, and if not puts
* the thread to sleep.
*/
virtual bool shouldSleep() = 0;
/*
* Locked so that waitTillNeeded can't be in an indeterminate state. Either
* set beforehand so never wait, or set after already waiting so that the
* notify that follows won't be waster in between the boolean and the
* actual call to wait.
*/
void done() {
std::unique_lock<std::mutex> lock(m_mtx);
m_bDone = true;
notifyWorker();
lock.unlock();
m_worker->join();
}
void notifyWorker() {
m_bRealWakeup = true;
m_shouldDoit.notify_one();
}
std::atomic_bool m_bDone{false};
std::atomic_bool m_bRealWakeup{false};
std::unique_ptr<std::thread> m_worker;
std::condition_variable m_shouldDoit;
private:
void waitTillNeeded() {
try {
shouldSleep();
} catch (const std::system_error& e) {
std::cout << "error shouldSleep" << std::endl;
}
std::unique_lock<std::mutex> lock(m_mtx);
if (!m_bDone && shouldSleep()) {
m_bRealWakeup = false;
m_shouldDoit.wait(lock, [this] { return this->realWakeup(); });
}
}
//Prevent spurious system wake up calls
bool realWakeup() { return m_bRealWakeup; }
std::mutex m_mtx;
};
inline AsyncWorker::~AsyncWorker() {}
} // matan
Logger.hh
#pragma once
#include "AsyncWorker.hh"
#include <fstream>
#include <string>
/*
* Has LogQueue, file to write to.
*/
namespace matan {
class Logger : public AsyncWorker {
/*
* Logger intended to prevent IO from blocking. Pushes the actual writing
* to disk onto a separate thread.
*
* Single writer. To make the interface similar to std::cout we need to allow
* separate calls to operator<<. For this to be multi writer we would need
* each operator<< call to contain a complete elements, as opposed to
* building it within m_buf and only later flushing it. (Please note this
* issue would exist even if we actually flushed on every call to flush()).
*/
public:
Logger(const std::string& ofname);
virtual ~Logger() = default;
Logger(const Logger&) = delete;
Logger& operator<<(const std::string& str) {m_buf += str; return *this;}
Logger& operator<<(const char* c) { m_buf += c; return *this; }
Logger& operator<<(char c) { m_buf += c; return *this; }
Logger& operator<<(Logger& (*pf)(Logger&)) {return pf(*this);}
void flush();
void close();
private:
void doFlush();
virtual void doit();
virtual bool shouldSleep() {
try {
return m_contents.empty();
} catch (const std::system_error& e) {
std::cout << "error checking m_contents.empty()" << std::endl;
throw e;
}
return m_contents.empty();
}
std::string m_buf;
std::ofstream m_ofstream;
BunchQueue<TakerQueue<std::string>> m_contents;
/*
* I'm making a guess here that one page in memory is 4KB and that it will
* be fastest if I can stay on one page (I need to pick a threshold
* somehow) and that most logs will be less than 1024 characters.
*/
static constexpr std::size_t MAX_LEN = 3 * 1024;
};
} // matan
namespace std {
inline matan::Logger& endl(matan::Logger& logger) {
logger << '\n';
logger.flush();
return logger;
}
}
Logger.cc
#include "Logger.hh"
namespace matan {
/********************** BunchLogger *******************************/
Logger::Logger(const std::string& ofname) :
AsyncWorker(),
m_ofstream(ofname, std::ios::out) {}
void Logger::close() {
doFlush();
done();
m_ofstream.close();
}
void Logger::flush() {
if (m_buf.size() > MAX_LEN) {
doFlush();
}
}
void Logger::doFlush() {
m_contents.push_back(m_buf);
notifyWorker();
m_buf.clear();
}
void Logger::doit() {
for (const auto& line : m_contents.takeQueue()) {
m_ofstream << line;
m_ofstream.flush();
}
}
} // matan
logger.cc
#include <string>
#include <sstream>
#include <algorithm>
#include <iterator>
#include <iostream>
#include <chrono>
#include "Logger.hh"
using namespace std::chrono;
int main() {
std::string lyrics = "Row, row, row your boat Gently down the stream, Merrily merrily, merrily, merrily Life is but a dream";
std::istringstream iss(lyrics);
std::vector<std::string> lyric_vec(std::istream_iterator<std::string>{iss},
std::istream_iterator<std::string>{});
std::ofstream mystream("/tmp/logger1.log", std::ios::out);
auto start1 = high_resolution_clock::now();
for (auto& lyric : lyric_vec) {
mystream << lyric << std::endl;
mystream.flush();
}
mystream.close();
std::cout
<< duration_cast<nanoseconds>(high_resolution_clock::now()-start1).count()
<< std::endl;
matan::Logger bunchLogger("/tmp/logger2.log");
auto start2 = high_resolution_clock::now();
for (auto& lyric : lyric_vec) {
bunchLogger << lyric << std::endl;
}
std::cout
<< duration_cast<nanoseconds>(high_resolution_clock::now()-start2).count()
<< std::endl;
bunchLogger.close();
std::cout << "finish logger" << std::endl;
return 0;
}
makefile
# to use Makefile variables later in the Makefile: $(<var>)
#
# -g adds debugging information to the executable file
# -Wall turns on most, but not all, compiler warnings
CC = clang++
CFLAGS = -g -Wall -std=c++1z -pthread
BINDIR = bin
bigmap: timsort.hh BigMap.hh bigmap.cc
$(CC) $(CFLAGS) bigmap.cc -o $(BINDIR)/bigmap
threadpool: ThreadPool.hh threadpool.cc
$(CC) $(CFLAGS) threadpool.cc -o $(BINDIR)/threadpool
msgq: BunchQueue.hh message_queue.cc
$(CC) $(CFLAGS) message_queue.cc -o $(BINDIR)/msgq
logger: Logger.o logger.cc
$(CC) $(CFLAGS) Logger.o logger.cc -o $(BINDIR)/logger
Logger.o: BunchQueue.hh AsyncWorker.hh Logger.hh Logger.cc
$(CC) $(CFLAGS) -c Logger.cc
# .PHONY is so that make doesn't confuse clean with a file
.PHONY clean:
clean:
rm -r $(BINDIR) && mkdir $(BINDIR)
rm -f *.o *~
Upvotes: 3
Views: 6781
Reputation: 6471
std::mutex is not 'reentrant'. The exception is thrown because you are trying to lock AsyncWorker::m_mtx while the mutex is already owned by the calling thread.
The path where the double lock occurs is:
AsyncWorker::waitTillNeeded()->Logger::shouldSleep()
Changing the type of AsyncWorker::m_mtx
to std::recursive_mutex
should fix this problem. I could not find a locking recursive path on BunchQueue::m_mtx, but it would not hurt to use a recursive mutex there as well.
Other issues:
You have a memory leak on BaseQueue::m_vec. free()
is never called on that pointer. I understand you are trying to avoid copying by using realloc (another issue there...) but be advised that realloc will copy the array contents if the current block cannot be resized in place. I suggest you seriously consider using plain old std::vector. Use std::vector::reserve()
to pre-allocate space for the queue as needed.
Your call to realloc in 'ShallowQueue::push_back()` is problematic:
this->m_vec = (T*) realloc(this->m_vec, sizeof(T)*this->m_capacity);
What happens if realloc() returns NULL? You have lost the previous value of m_vec and cannot access the data that was previously stored. You also lose the capacity to free memory in an orderly fashion.
The correct way to call realloc would be:
T* p = (T*) realloc(this->m_vec, sizeof(T)*this->m_capacity);
if (!p)
{
// do some error handling..
// return
}
this->m_vec = p; // p is now known to be non-null, it is safe to assign its value to m_vec
Upvotes: 1