Dark Sorrow
Dark Sorrow

Reputation: 1947

Random boost::interprocess_exception::library_error on boost::interprocess::message_queue

I've created a template class for IPC using Message Queue.
I'm running my program in infinite while loop (called main loop).
I collect data from various sub-systems (sensors) over Ethernet and pass the received data to appropriate processes using Message Queue (their are multiple different processes that can act as data sink each with their own message queue).
I've just run the program and I'm not performing any activity. This is the only program running and I reboot the OS before every run.
The program is just running in while loop where all flags are set to false; hence the program is just running a blank (empty) loop.
Randomly I'm getting boost::interprocess_exception::library_error. As their is no activity I expected that their should be no error.

I commented out the Ethernet related code but still I'm getting the same error.

I'm getting error in statement:

if (primaryNode == true)
{
    this->mSecondaryToPrimaryMessageQueue->receive(
        &receiveData,
        sizeof(receiveData),
        receiveLength,
        priority
    );
}
else
{
    this->mPrimaryToSecondaryMessageQueue->receive(
        &receiveData,
        sizeof(receiveData),
        receiveLength,
        priority
    );
}

I tried with primaryNode set to true or false. I get the same error.

Code :

ipc.hpp

#pragma once

#include <thread>
#include <string>
#include <atomic>
#include <memory>
#include <variant>
#include <optional>
#include <iostream>
#include <functional>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>

/// @brief 
/// @tparam T1 Specifies the data-type that has to be sent
/// @tparam T2 Specifies the data-type that has will be received
/// @tparam primaryNode Denotes if the RTP is the primaryNode owner/creater of the message queue
template<typename T1, typename T2, bool primaryNode>
class Ipc
{
private:
    static const std::uint8_t MAX_MESSAGE_DEPTH = 5; //Specifies the number of messages will the message queue hold
    using callback_t = std::function<void(void)>;
    callback_t mCallback;
    std::unique_ptr<boost::interprocess::message_queue> mPrimaryToSecondaryMessageQueue;
    std::unique_ptr<boost::interprocess::message_queue> mSecondaryToPrimaryMessageQueue;

    std::string         mPrimaryToSecondaryMessageQueueName;
    std::string         mSecondaryToPrimaryMessageQueueName;

    std::thread         mReceiveThread;
    std::atomic_bool    mExitReceiveThread{ false };
    boost::lockfree::spsc_queue<T2, boost::lockfree::capacity<MAX_MESSAGE_DEPTH>> mReceiveDataQueue;

    void listen(void);
public:
    Ipc() {}
    bool open(const std::string& queueName);
    bool close(void);
    bool send(const T1& data, std::uint32_t priority = 10);
    std::optional<T2> receive(void);
    bool register_callback(callback_t callback_implementation);
    bool isDataAvailableInReceiveDataQueue(void) const;
};


template<typename T1, typename T2, bool primaryNode>
inline void Ipc<T1, T2, primaryNode>::listen(void)
{
    T2                          receiveData;//Buffer to store received data
    std::uint64_t               receiveLength;
    std::uint32_t               priority;
    while(this->mExitReceiveThread.load() == false)
    {
        try
        {
            std::memset(&receiveData, 0, sizeof(receiveData)); //Initialize buffer to 0
            receiveLength = 0; //Initialize read length to 0
            priority = 0; //Initialize priority to 0
            if (primaryNode == true)
            {
                this->mSecondaryToPrimaryMessageQueue->receive(
                    &receiveData,
                    sizeof(receiveData),
                    receiveLength,
                    priority
                );
            }
            else
            {
                this->mPrimaryToSecondaryMessageQueue->receive(
                    &receiveData,
                    sizeof(receiveData),
                    receiveLength,
                    priority
                );
            }
            this->mReceiveDataQueue.push(receiveData);
            this->mCallback();
        }
        catch (const std::exception& ex)
        {
            std::cout << "Inside Listen Exception\n";
            std::cout << ex.what() << std::endl;
        }
    }
}

template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::open(const std::string& queueName)
{
    try
    {
        if(primaryNode == true)
        {
            this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
            this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
        }
        else
        {
            this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
            this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
        }
        //Open-Create message queue to send data from primaryNode node to secondary node
        this->mPrimaryToSecondaryMessageQueue = std::make_unique<boost::interprocess::message_queue>(
            boost::interprocess::open_or_create,
            this->mPrimaryToSecondaryMessageQueueName.c_str(),
            MAX_MESSAGE_DEPTH,
            sizeof(T1)
        );

        //Open-Create message queue to send data from secondary node to primaryNode node  
        this->mSecondaryToPrimaryMessageQueue = std::make_unique<boost::interprocess::message_queue>(
            boost::interprocess::open_or_create,
            this->mSecondaryToPrimaryMessageQueueName.c_str(),
            MAX_MESSAGE_DEPTH,
            sizeof(T2)
        );

        //Start Listner Thread
        this->mReceiveThread = std::thread(&Ipc::listen, this);
        return true;
    }
    catch (const std::exception& ex)
    {
        std::cout << ex.what() << std::endl;
        return false;
    }
}

template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::close(void)
{
    try
    {
        this->mExitReceiveThread.store(true); //Marked to close thread
        boost::interprocess::message_queue::remove(this->mPrimaryToSecondaryMessageQueueName.c_str());//Delete Primary to Secondary Message Queue
        boost::interprocess::message_queue::remove(this->mSecondaryToPrimaryMessageQueueName.c_str());//Delete Secondary to Primary Message Queue
    }
    catch (const std::exception& ex)
    {
        std::cout << ex.what() << std::endl;
        return false;
    }
}

template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::send(const T1& data, std::uint32_t priority)
{
    try
    {
        if (primaryNode == true) //Send message on Primary to Secondary Queue
        {
            this->mPrimaryToSecondaryMessageQueue->send(&data, sizeof(data), priority);
        }
        else //Send message on Secondary to Primary Queue
        {
            this->mSecondaryToPrimaryMessageQueue->send(&data, sizeof(data), priority);
        }
        return true;
    }
    catch (const std::exception& ex)
    {
        std::cout << ex.what() << std::endl;
        return false;
    }
}

template<typename T1, typename T2, bool primaryNode>
inline std::optional<T2> Ipc<T1, T2, primaryNode>::receive(void)
{
    std::optional<T2> data{ std::nullopt };
    if (this->mReceiveDataQueue.read_available() > 0) //If data is avaiable, pop first element
    {
        data = this->mReceiveDataQueue.front();
        this->mReceiveDataQueue.pop();
    }
    else
    {
        //data = std::nullopt; //Not needed 
    }
    return data;
}

template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::register_callback(callback_t callbackImplementation)
{
    try
    {
        this->mCallback = callbackImplementation;
        return true;
    }
    catch (const std::exception& ex)
    {
        std::cerr << ex.what() << '\n';
    }
    return false;
}

template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::isDataAvailableInReceiveDataQueue(void) const
{
    if (this->mReceiveDataQueue.read_available() > 0) //If data is avaiable
    {
        return true;
    }
    else
    {
        return false;
    }
}

main.cpp

#include <ipc.hpp>
#include <iostream>

//P1 stands for Process 1
//P2 stands for Process 2
struct P1ToP2
{
    float a;
    int b;
};

struct P2ToP1
{
    int a;
    int b;
};

Ipc<P1ToP2, P2ToP1, false> ipc1; //Global IPC object

void message_queue_data_received(void)
{
    if (ipc1.isDataAvailableInReceiveDataQueue() == true)
    {
        auto s = ipc1.receive();
        if (s.has_value() == true)
        {
            std::cout << "a : " << s->a << "\tb : " << s->b << std::endl;
        }

    }
}

int main(int argc, char *argv[])
{
    bool dataReceivedOnEthernet = false;
    
    ipc1.register_callback(message_queue_data_received);
    this->ipc1.open("ipc1");
    
    while(true)
    {
        if(dataReceivedOnEthernet == true) //Flag set by other thread
        {
            P1ToP2 p;
            p.a = 10.23; //Some Data received over ethernet
            p.b = 10; //Some Data received over ethernet
            ipc1.send(p); //Send data over IPC
        }
        //Other Code
    }
}

Error

boost::interprocess_exception::library_error

Upvotes: 1

Views: 78

Answers (1)

sehe
sehe

Reputation: 393694

Why do the processes use a different type for the message, while silently assuming they're the same size (and both trivial and standard layout etc...). Are you mixing up the various types and queues? It looks like it.

It helps if you name things well. Also, remove duplication.

I'd separate the queues by message type. Name them by the roles:

// Requests are sent by client to server.
// Responses are sent by server to client.
struct Request { int a, b; };
struct Response { double a; int b; };
using ClientIpc = Ipc<Request, Response, true>;
using ServerIpc = Ipc<Request, Response, false>;

Then, by defining a Channel type:

using Prio = uint32_t;
template <typename T> struct Channel {
    Channel(std::string name);
    ~Channel();

    std::tuple<T, Prio> receive(std::stop_token token);
    bool send(T const& msg, Prio prio, std::stop_token token);

  private:
    using Queue = boost::interprocess::message_queue;
    std::string name_;
    Queue       queue_ = open();

    Queue open() {
        if constexpr (IsClient) {
            return {boost::interprocess::open_only, name_.c_str()};
        } else {
            return {boost::interprocess::open_or_create, name_.c_str(), MAX_DEPTH, sizeof(T)};
        }
    }
};

Now we can simply state:

Channel<Request>  reqQ;
Channel<Response> resQ;

With construction like

 Ipc(std::string const& queueName) try
    : reqQ(queueName + "_req")
    , resQ(queueName + "_res")
    , mThread(bind(&Ipc::listen, this, std::placeholders::_1)) {
} catch (std::exception const& ex) {
    std::cerr << "Ipc: " << ex.what() << std::endl;
    throw;
}

The listener queues received messages. The type depends on client/server mode:

using Incoming = std::conditional_t<IsClient, Response, Request>;
boost::lockfree::spsc_queue<Incoming, boost::lockfree::capacity<MAX_DEPTH>> mInbox;

Note how I opted for the much safe jthread with stop tokens for coordinating thread exit:

std::jthread      mThread;
std::stop_token   mToken = mThread.get_stop_token();

void listen(std::stop_token token) {
    while (!token.stop_requested()) {
        try {
            if constexpr (IsClient)
                mInbox.push(get<0>(resQ.receive(token)));
            else
                mInbox.push(get<0>(reqQ.receive(token)));

            if (mCallback)
                mCallback();
        } catch (std::exception const& ex) {
            std::cerr << "Listen: " << ex.what() << std::endl;
        }
    }
}

The external operations look much simpler, like:

void wait() { mThread.join(); }

void close() {
    mThread.request_stop();
    if (std::this_thread::get_id() != mThread.get_id())
        wait();
}

bool send(Request  const& msg, Prio prio = 10) { return reqQ.send(msg, prio, mToken); }
bool send(Response const& msg, Prio prio = 10) { return resQ.send(msg, prio, mToken); }

std::optional<Incoming> consume() {
    if (Incoming val; mInbox.consume_one([&val](Incoming res) { val = std::move(res); }))
        return val;
    return {};
}

void register_callback(callback_t cb) { mCallback = cb; }
bool haveMessage() const { return mInbox.read_available(); }

Sample Client/Server

Let's define the above server to respond to Requests by sending back the square-root of a and b/2:

void server() {
    ServerIpc ipc(IPC_NAME);

    auto handler = [&ipc] {
        assert(ipc.haveMessage());
        if (std::optional<Request> req = ipc.consume()) {
            auto [a, b] = *req;
            std::cout << "server received request a:" << a << " b:" << b << std::endl;

            if (a == -42 && b == -42) {
                std::cout << " -> server handling close request" << std::endl;
                ipc.close();
            } else {
                // send response
                ipc.send(Response{sqrt(a), b / 2});
            }
        }
    };

    ipc.register_callback(handler);
    ipc.wait();
}

That's all. Note how we've added a mechanism to tell the server the client wants it to exit (because the server owns the resources). The client may look something like this:

void client() {
    ClientIpc ipc(IPC_NAME);

    auto handler = [&ipc] {
        assert(ipc.haveMessage());
        if (std::optional<Response> res = ipc.consume()) {
            auto [a, b] = *res;
            std::cout << "client received response a:" << a << " b:" << b << std::endl;
        }
    };

    ipc.register_callback(handler);

    for (int i = 0; i < 100; ++i) {
        if (rand() % 30 == 0)            // Flag set by other thread
            ipc.send(Request{i, 2 * i}); // Send request

        std::this_thread::sleep_for(10ms);
    }

    std::cout << "Client sending close command" << std::endl;
    ipc.send(Request{-42, -42});

    std::cout << "Closing" << std::endl;
    ipc.close();
}

All it does is send some requests for ~10s and log the responses. Then it tells the server to quit, and closes. Only the server will remove the queues.

A simple main to switch client/server:

int main(int argc, char** argv) {
    if (std::set<std::string_view>(argv + 1, argv + argc).contains("server"))
        server();
    else
        client();

    std::cout << "Bye" << std::endl;
}

Live Demo

Live¹ On Coliru

  • File test.h

     #pragma once
    
     #include <boost/interprocess/ipc/message_queue.hpp>
     #include <boost/lockfree/spsc_queue.hpp>
     #include <iostream>
     #include <optional>
     #include <thread>
     using namespace std::chrono_literals;
    
     template <typename Request, typename Response, bool IsClient> class Ipc {
       private:
         static constexpr uint8_t MAX_DEPTH = 5;
    
         using callback_t = std::function<void()>;
         callback_t mCallback;
    
         using Prio = uint32_t;
         template <typename T> struct Channel {
             Channel(std::string name) : name_(std::move(name)) { //
                 assert(queue_.get_max_msg_size() == sizeof(T));
             }
    
             ~Channel() {
                 if (!IsClient) {
                     std::cerr << "Server cleaning up " << name_ << std::endl;
                     Queue::remove(name_.c_str());
                 }
             }
    
             std::tuple<T, Prio> receive(std::stop_token token) {
                 size_t len  = 0;
                 Prio   prio = 0;
                 T      msg{};
    
                 while (!token.stop_requested()) {
                     auto deadline = std::chrono::steady_clock::now() + 50ms;
                     if (queue_.timed_receive(&msg, sizeof(msg), len, prio, deadline)) {
                         assert(len == sizeof(T));
                         return {std::move(msg), prio};
                     }
                 }
    
                 throw std::runtime_error("stop requested");
             }
    
             bool send(T const& msg, Prio prio, std::stop_token token) {
                 while (!token.stop_requested()) {
                     auto deadline = std::chrono::steady_clock::now() + 50ms;
                     if (queue_.timed_send(&msg, sizeof(msg), prio, deadline))
                         return true;
                 }
                 return false;
             }
    
           private:
             using Queue = boost::interprocess::message_queue;
             std::string name_;
             Queue       queue_ = open();
    
             Queue open() {
                 if constexpr (IsClient) {
                     return {boost::interprocess::open_only, name_.c_str()};
                 } else {
                     return {boost::interprocess::open_or_create, name_.c_str(), MAX_DEPTH, sizeof(T)};
                 }
             }
         };
    
         Channel<Request>  reqQ;
         Channel<Response> resQ;
         std::jthread      mThread;
         std::stop_token   mToken = mThread.get_stop_token();
    
         using Incoming = std::conditional_t<IsClient, Response, Request>;
         boost::lockfree::spsc_queue<Incoming, boost::lockfree::capacity<MAX_DEPTH>> mInbox;
    
         void listen(std::stop_token token) {
             while (!token.stop_requested()) {
                 try {
                     if constexpr (IsClient)
                         mInbox.push(get<0>(resQ.receive(token)));
                     else
                         mInbox.push(get<0>(reqQ.receive(token)));
    
                     if (mCallback)
                         mCallback();
                 } catch (std::exception const& ex) {
                     std::cerr << "Listen: " << ex.what() << std::endl;
                 }
             }
         }
    
       public:
         Ipc(std::string const& queueName) try
             : reqQ(queueName + "_req")
             , resQ(queueName + "_res")
             , mThread(bind(&Ipc::listen, this, std::placeholders::_1)) {
         } catch (std::exception const& ex) {
             std::cerr << "Ipc: " << ex.what() << std::endl;
             throw;
         }
    
         void wait() { mThread.join(); }
    
         void close() {
             mThread.request_stop();
             if (std::this_thread::get_id() != mThread.get_id())
                 wait();
         }
    
         bool send(Request  const& msg, Prio prio = 10) { return reqQ.send(msg, prio, mToken); }
         bool send(Response const& msg, Prio prio = 10) { return resQ.send(msg, prio, mToken); }
    
         std::optional<Incoming> consume() {
             if (Incoming val; mInbox.consume_one([&val](Incoming res) { val = std::move(res); }))
                 return val;
             return {};
         }
    
         void register_callback(callback_t cb) { mCallback = cb; }
         bool haveMessage() const { return mInbox.read_available(); }
     };
    
  • File test.cpp

     #include "test.h"
     #include <cmath>
     #include <set>
    
     // Requests are sent by client to server.
     // Responses are sent by server to client.
     struct Request { int a, b; };
     struct Response { double a; int b; };
     using ClientIpc = Ipc<Request, Response, true>;
     using ServerIpc = Ipc<Request, Response, false>;
    
     static std::string IPC_NAME = "so_demo_ipc";
     void server() {
         ServerIpc ipc(IPC_NAME);
    
         auto handler = [&ipc] {
             assert(ipc.haveMessage());
             if (std::optional<Request> req = ipc.consume()) {
                 auto [a, b] = *req;
                 std::cout << "server received request a:" << a << " b:" << b << std::endl;
    
                 if (a == -42 && b == -42) {
                     std::cout << " -> server handling close request" << std::endl;
                     ipc.close();
                 } else {
                     // send response
                     ipc.send(Response{sqrt(a), b / 2});
                 }
             }
         };
    
         ipc.register_callback(handler);
         ipc.wait();
     }
    
     void client() {
         ClientIpc ipc(IPC_NAME);
    
         auto handler = [&ipc] {
             assert(ipc.haveMessage());
             if (std::optional<Response> res = ipc.consume()) {
                 auto [a, b] = *res;
                 std::cout << "client received response a:" << a << " b:" << b << std::endl;
             }
         };
    
         ipc.register_callback(handler);
    
         for (int i = 0; i < 100; ++i) {
             if (rand() % 30 == 0)            // Flag set by other thread
                 ipc.send(Request{i, 2 * i}); // Send request
    
             std::this_thread::sleep_for(10ms);
         }
    
         std::cout << "Client sending close command" << std::endl;
         ipc.send(Request{-42, -42});
    
         std::cout << "Closing" << std::endl;
         ipc.close();
     }
    
     int main(int argc, char** argv) {
         if (std::set<std::string_view>(argv + 1, argv + argc).contains("server"))
             server();
         else
             client();
    
         std::cout << "Bye" << std::endl;
     }
    

With a live demo locally:

enter image description here

¹ sadly online compilers don't allow shared memory access

Upvotes: 1

Related Questions