isaach1000
isaach1000

Reputation: 1839

Thread-safe reference-counted queue C++

I'm struggling to implement a thread-safe reference-counted queue. The idea is that I have a number of tasks that each maintain a shared_ptr to a task manager that owns the queue. Here is a minimal implementation that should encounter that same issue:

#include <condition_variable>
#include <deque>
#include <functional>
#include <iostream>
#include <memory>
#include <mutex>
#include <thread>

namespace {

class TaskManager;

struct Task {
    std::function<void()> f;
    std::shared_ptr<TaskManager> manager;
};

class Queue {
  public:
    Queue()
        : _queue()
        , _mutex()
        , _cv()
        , _running(true)
        , _thread([this]() { sweepQueue(); })
    {
    }

    ~Queue() { close(); }

    void close() noexcept
    {
        try {
            {
                std::lock_guard<std::mutex> lock(_mutex);
                if (!_running) {
                    return;
                }
                _running = false;
            }
            _cv.notify_one();
            _thread.join();
        } catch (...) {
            std::cerr << "An error occurred while closing the queue\n";
        }
    }

    void push(Task&& task)
    {
        std::unique_lock<std::mutex> lock(_mutex);
        _queue.emplace_back(std::move(task));
        lock.unlock();
        _cv.notify_one();
    }

  private:
    void sweepQueue() noexcept
    {
        while (true) {
            try {
                std::unique_lock<std::mutex> lock(_mutex);
                _cv.wait(lock, [this] { return !_running || !_queue.empty(); });

                if (!_running && _queue.empty()) {
                    return;
                }

                if (!_queue.empty()) {
                    const auto task = _queue.front();
                    _queue.pop_front();
                    task.f();
                }
            } catch (...) {
                std::cerr << "An error occurred while sweeping the queue\n";
            }
        }
    }

    std::deque<Task> _queue;
    std::mutex _mutex;
    std::condition_variable _cv;
    bool _running;
    std::thread _thread;
};

class TaskManager : public std::enable_shared_from_this<TaskManager> {
  public:
    void addTask(std::function<void()> f)
    {
        _queue.push({ f, shared_from_this() });
    }

  private:
    Queue _queue;
};

}  // anonymous namespace

int main(void)
{
    const auto manager = std::make_shared<TaskManager>();
    manager->addTask([]() { std::cout << "Hello world\n"; });
}

The problem I find is that on rare occasions, the queue will try to invoke its own destructor within the sweepQueue method. Upon further inspection, it seems that the reference count on the TaskManager hits zero once the last task is dequeued. How can I safely maintain the reference count without invoking the destructor?

Update: The example does not clarify the need for the std::shared_ptr<TaskManager> within Task. Here is an example use case that should illustrate the need for this seemingly unnecessary ownership cycle.

std::unique_ptr<Task> task;
{
    const auto manager = std::make_shared<TaskManager>();
    task = std::make_unique<Task>(someFunc, manager);
}
// Guarantees manager is not destroyed while task is still in scope.

Upvotes: 0

Views: 268

Answers (2)

Yakk - Adam Nevraumont
Yakk - Adam Nevraumont

Reputation: 275220

I'd refactor the queue from the thread first.

But to fix your problem:

struct am_I_alive {
  explicit operator bool() const { return m_ptr.lock(); }
private:
  std::weak_ptr<void> m_ptr;
};
struct lifetime_tracker {
  am_I_alive track_lifetime() {
    if (!m_ptr) m_ptr = std::make_shared<bool>(true);
    return {m_ptr};
  }
  lifetime_tracker() = default;
  lifetime_tracker(lifetime_tracker const&) {} // do nothing, don't copy
  lifetime_tracker& operator=(lifetime_tracker const&){ return *this; }
private:
  std::shared_ptr<void> m_ptr;
};

this is a little utility to detect if we have been deleted. It is useful in any code that calls an arbitrary callback whose side effect could include delete(this).

Privately inherit your Queue from it.

Then split popping the task from running it.

std::optional<Task> get_task() {
  std::unique_lock<std::mutex> lock(_mutex);
  _cv.wait(lock, [this] { return !_running || !_queue.empty(); });
  if (!_running && _queue.empty()) {
    return {}; // end
  }
  auto task = _queue.front();
  _queue.pop_front();
  return task;
}
void sweepQueue() noexcept
{
  while (true) {
    try {
      auto task = get_task();
      if (!task) return;
      // we are alive here
      auto alive = track_lifetime();
      try {
        (*task).f();
      } catch(...) {
        std::cerr << "An error occurred while running a task\n";
      }
      task={};
      // we could be deleted here
      if (!alive)
        return; // this was deleted, get out of here
      }
    } catch (...) {
      std::cerr << "An error occurred while sweeping the queue\n";
    }
  }
}

and now you are safe.

After that you need to deal with the thread problem.

The thread problem is that you need your code to destroy the thread from within the thread it is running. At the same time, you also need to guarantee that the thread has terminated before main ends.

These are not compatible.

To fix that, you need to create a thread owning pool that doesn't have your "keep alive" semantics, and get your thread from there.

These threads don't delete themselves; instead, they return themselves to that pool for reuse by another client.

At shutdown, those threads are blocked on to ensure you don't have code running elsewhere that hasn't halted before the end of main.

To write such a pool without your inverted dependency mess, split the queue part of your code off. This queue owns no thread.

template<class T>
struct threadsafe_queue {
  void push(T);
  std::optional<T> pop(); // returns empty if thread is aborted
  void abort();
  ~threadsafe_queue();
private:
  std::mutex m;
  std::condition_variable v;
  std::deque<T> data;
  bool aborted = false;
};

then a simple thread pool:

struct thread_pool {
  template<class F>
  std::future<std::result_of_t<F&()>> enqueue( F&& f );
  template<class F>
  std::future<std::result_of_t<F&()>> thread_off_now( F&& f ); // starts a thread if there aren't any free
  void abort();
  void start_thread( std::size_t n = 1 );
  std::size_t count_threads() const;
  ~thread_pool();
private:
  threadsafe_queue< std::function<void()> > tasks;
  std::vector< std::thread > threads;
  static void thread_loop( thread_pool* pool );
};

make a thread pool singleton. Get your threads for your queue from thread_off_now method, guaranteeing you a thread that (when you are done with it) can be recycled, and whose lifetime is handled by someone else.


But really, you should instead be thinking with ownership in mind. The idea that tasks and task queues mutually own each other is a mess.

If someone disposes of a task queue, it is probably a good idea to abandon the tasks instead of persisting it magically and silently.

Which is what my simple thread pool does.

Upvotes: 1

Maxim Egorushkin
Maxim Egorushkin

Reputation: 136208

The ownership hierarchy here is TaskManager owns Queue and Queue owns Tasks. Tasks maintaining a shared pointer to TaskManager create an ownership cycle which does not seem to serve a useful purpose here.

This is the ownership what is root of the problem here. A Queue is owned by TaskManager, so that Queue can have a plain pointer to TaskManager and pass that pointer to Task in sweepQueue. You do not need std::shared_pointer<TaskManager> in Task at all here.

Upvotes: 3

Related Questions