Sam Kellett
Sam Kellett

Reputation: 1347

Assertion when trying to a use a Boost.Interprocess message queue across a bunch of processes

I have the following reduced program that spins up a bunch of child processes and then uses a boost::interprocess::message_queue to send a message to each one. This works when number of processes is small (about 4 on my machine) but as that number rises I get the following message:

head (81473): "./a.out"
Assertion failed: (res == 0), function do_wait, file /usr/local/include/boost/interprocess/sync/posix/condition.hpp, line 175.

I'm guessing it's a problem with my synchronisation.. have I done something wrong or is the boost::interprocess::scoped_lock not enough?

My program is here:

#include <boost/interprocess/ipc/message_queue.hpp>

#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>

#include <boost/process.hpp>

#include <iostream>

auto main(int argc, char **argv) -> int
{
    namespace ip = boost::interprocess;

    boost::filesystem::path self{argv[0]};

    if (argc == 1) {
        std::cout << "head (" << ::getpid() << "): " << self << std::endl;

        // create a message queue.
        ip::message_queue::remove("work_queue");
        ip::message_queue tasks{ip::create_only, "work_queue", 100, sizeof(int)};

        // mutex for writing to the queue.
        ip::interprocess_mutex mutex{};

        // spawn off a bunch of processes.
        const auto cores{5 * std::thread::hardware_concurrency()};
        std::vector<boost::process::child> workers{};
        for (auto i = 0; i < cores; ++i) {
            workers.emplace_back(self, "child");
        }

        // send message to each core.
        for (auto i = 0; i < cores; ++i) {
            ip::scoped_lock<decltype(mutex)> lock{mutex};
            tasks.send(&i, sizeof(i), 0);
        }

        // wait for each process to finish.
        for (auto &worker : workers) {
            worker.wait();
        }
    } else if (argc == 2 && std::strcmp(argv[1], "child") == 0) {
        // connect to message queue.
        ip::message_queue tasks{ip::open_only, "work_queue"};

        // mutex for reading from the queue.
        ip::interprocess_mutex mutex{};

        unsigned int priority;
        ip::message_queue::size_type recvd_size;

        {
            ip::scoped_lock<decltype(mutex)> lock{mutex};

            int number;
            tasks.receive(&number, sizeof(number), recvd_size, priority);
            std::cout << "child (" << ::getpid() << "): " << self << ", received: " << number << std::endl;
        }
    }

    return 0;
}

Upvotes: 0

Views: 405

Answers (1)

user7860670
user7860670

Reputation: 37587

You create an interprocess_mutex instance on the stack. So each process has it's own mutex and locking it does not synchronize anything. You need to create a shared memory region, place mutex there and then open the same shared memory region in child process to access the mutex created by parent process.

Upvotes: 1

Related Questions