Reputation: 1947
I've created a template class for IPC using Message Queue.
I'm running my program in infinite while loop (called main loop).
I collect data from various sub-systems (sensors) over Ethernet and pass the received data to appropriate processes using Message Queue (their are multiple different processes that can act as data sink each with their own message queue).
I've just run the program and I'm not performing any activity. This is the only program running and I reboot the OS before every run.
The program is just running in while loop where all flags are set to false; hence the program is just running a blank (empty) loop.
Randomly I'm getting boost::interprocess_exception::library_error
. As their is no activity I expected that their should be no error.
I commented out the Ethernet related code but still I'm getting the same error.
I'm getting error in statement:
if (primaryNode == true)
{
this->mSecondaryToPrimaryMessageQueue->receive(
&receiveData,
sizeof(receiveData),
receiveLength,
priority
);
}
else
{
this->mPrimaryToSecondaryMessageQueue->receive(
&receiveData,
sizeof(receiveData),
receiveLength,
priority
);
}
I tried with primaryNode set to true or false. I get the same error.
Code :
ipc.hpp
#pragma once
#include <thread>
#include <string>
#include <atomic>
#include <memory>
#include <variant>
#include <optional>
#include <iostream>
#include <functional>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
/// @brief
/// @tparam T1 Specifies the data-type that has to be sent
/// @tparam T2 Specifies the data-type that has will be received
/// @tparam primaryNode Denotes if the RTP is the primaryNode owner/creater of the message queue
template<typename T1, typename T2, bool primaryNode>
class Ipc
{
private:
static const std::uint8_t MAX_MESSAGE_DEPTH = 5; //Specifies the number of messages will the message queue hold
using callback_t = std::function<void(void)>;
callback_t mCallback;
std::unique_ptr<boost::interprocess::message_queue> mPrimaryToSecondaryMessageQueue;
std::unique_ptr<boost::interprocess::message_queue> mSecondaryToPrimaryMessageQueue;
std::string mPrimaryToSecondaryMessageQueueName;
std::string mSecondaryToPrimaryMessageQueueName;
std::thread mReceiveThread;
std::atomic_bool mExitReceiveThread{ false };
boost::lockfree::spsc_queue<T2, boost::lockfree::capacity<MAX_MESSAGE_DEPTH>> mReceiveDataQueue;
void listen(void);
public:
Ipc() {}
bool open(const std::string& queueName);
bool close(void);
bool send(const T1& data, std::uint32_t priority = 10);
std::optional<T2> receive(void);
bool register_callback(callback_t callback_implementation);
bool isDataAvailableInReceiveDataQueue(void) const;
};
template<typename T1, typename T2, bool primaryNode>
inline void Ipc<T1, T2, primaryNode>::listen(void)
{
T2 receiveData;//Buffer to store received data
std::uint64_t receiveLength;
std::uint32_t priority;
while(this->mExitReceiveThread.load() == false)
{
try
{
std::memset(&receiveData, 0, sizeof(receiveData)); //Initialize buffer to 0
receiveLength = 0; //Initialize read length to 0
priority = 0; //Initialize priority to 0
if (primaryNode == true)
{
this->mSecondaryToPrimaryMessageQueue->receive(
&receiveData,
sizeof(receiveData),
receiveLength,
priority
);
}
else
{
this->mPrimaryToSecondaryMessageQueue->receive(
&receiveData,
sizeof(receiveData),
receiveLength,
priority
);
}
this->mReceiveDataQueue.push(receiveData);
this->mCallback();
}
catch (const std::exception& ex)
{
std::cout << "Inside Listen Exception\n";
std::cout << ex.what() << std::endl;
}
}
}
template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::open(const std::string& queueName)
{
try
{
if(primaryNode == true)
{
this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
}
else
{
this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
}
//Open-Create message queue to send data from primaryNode node to secondary node
this->mPrimaryToSecondaryMessageQueue = std::make_unique<boost::interprocess::message_queue>(
boost::interprocess::open_or_create,
this->mPrimaryToSecondaryMessageQueueName.c_str(),
MAX_MESSAGE_DEPTH,
sizeof(T1)
);
//Open-Create message queue to send data from secondary node to primaryNode node
this->mSecondaryToPrimaryMessageQueue = std::make_unique<boost::interprocess::message_queue>(
boost::interprocess::open_or_create,
this->mSecondaryToPrimaryMessageQueueName.c_str(),
MAX_MESSAGE_DEPTH,
sizeof(T2)
);
//Start Listner Thread
this->mReceiveThread = std::thread(&Ipc::listen, this);
return true;
}
catch (const std::exception& ex)
{
std::cout << ex.what() << std::endl;
return false;
}
}
template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::close(void)
{
try
{
this->mExitReceiveThread.store(true); //Marked to close thread
boost::interprocess::message_queue::remove(this->mPrimaryToSecondaryMessageQueueName.c_str());//Delete Primary to Secondary Message Queue
boost::interprocess::message_queue::remove(this->mSecondaryToPrimaryMessageQueueName.c_str());//Delete Secondary to Primary Message Queue
}
catch (const std::exception& ex)
{
std::cout << ex.what() << std::endl;
return false;
}
}
template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::send(const T1& data, std::uint32_t priority)
{
try
{
if (primaryNode == true) //Send message on Primary to Secondary Queue
{
this->mPrimaryToSecondaryMessageQueue->send(&data, sizeof(data), priority);
}
else //Send message on Secondary to Primary Queue
{
this->mSecondaryToPrimaryMessageQueue->send(&data, sizeof(data), priority);
}
return true;
}
catch (const std::exception& ex)
{
std::cout << ex.what() << std::endl;
return false;
}
}
template<typename T1, typename T2, bool primaryNode>
inline std::optional<T2> Ipc<T1, T2, primaryNode>::receive(void)
{
std::optional<T2> data{ std::nullopt };
if (this->mReceiveDataQueue.read_available() > 0) //If data is avaiable, pop first element
{
data = this->mReceiveDataQueue.front();
this->mReceiveDataQueue.pop();
}
else
{
//data = std::nullopt; //Not needed
}
return data;
}
template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::register_callback(callback_t callbackImplementation)
{
try
{
this->mCallback = callbackImplementation;
return true;
}
catch (const std::exception& ex)
{
std::cerr << ex.what() << '\n';
}
return false;
}
template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::isDataAvailableInReceiveDataQueue(void) const
{
if (this->mReceiveDataQueue.read_available() > 0) //If data is avaiable
{
return true;
}
else
{
return false;
}
}
main.cpp
#include <ipc.hpp>
#include <iostream>
//P1 stands for Process 1
//P2 stands for Process 2
struct P1ToP2
{
float a;
int b;
};
struct P2ToP1
{
int a;
int b;
};
Ipc<P1ToP2, P2ToP1, false> ipc1; //Global IPC object
void message_queue_data_received(void)
{
if (ipc1.isDataAvailableInReceiveDataQueue() == true)
{
auto s = ipc1.receive();
if (s.has_value() == true)
{
std::cout << "a : " << s->a << "\tb : " << s->b << std::endl;
}
}
}
int main(int argc, char *argv[])
{
bool dataReceivedOnEthernet = false;
ipc1.register_callback(message_queue_data_received);
this->ipc1.open("ipc1");
while(true)
{
if(dataReceivedOnEthernet == true) //Flag set by other thread
{
P1ToP2 p;
p.a = 10.23; //Some Data received over ethernet
p.b = 10; //Some Data received over ethernet
ipc1.send(p); //Send data over IPC
}
//Other Code
}
}
Error
boost::interprocess_exception::library_error
Upvotes: 1
Views: 78
Reputation: 393694
Why do the processes use a different type for the message, while silently assuming they're the same size (and both trivial and standard layout etc...). Are you mixing up the various types and queues? It looks like it.
It helps if you name things well. Also, remove duplication.
I'd separate the queues by message type. Name them by the roles:
// Requests are sent by client to server.
// Responses are sent by server to client.
struct Request { int a, b; };
struct Response { double a; int b; };
using ClientIpc = Ipc<Request, Response, true>;
using ServerIpc = Ipc<Request, Response, false>;
Then, by defining a Channel type:
using Prio = uint32_t;
template <typename T> struct Channel {
Channel(std::string name);
~Channel();
std::tuple<T, Prio> receive(std::stop_token token);
bool send(T const& msg, Prio prio, std::stop_token token);
private:
using Queue = boost::interprocess::message_queue;
std::string name_;
Queue queue_ = open();
Queue open() {
if constexpr (IsClient) {
return {boost::interprocess::open_only, name_.c_str()};
} else {
return {boost::interprocess::open_or_create, name_.c_str(), MAX_DEPTH, sizeof(T)};
}
}
};
Now we can simply state:
Channel<Request> reqQ;
Channel<Response> resQ;
With construction like
Ipc(std::string const& queueName) try
: reqQ(queueName + "_req")
, resQ(queueName + "_res")
, mThread(bind(&Ipc::listen, this, std::placeholders::_1)) {
} catch (std::exception const& ex) {
std::cerr << "Ipc: " << ex.what() << std::endl;
throw;
}
The listener queues received messages. The type depends on client/server mode:
using Incoming = std::conditional_t<IsClient, Response, Request>;
boost::lockfree::spsc_queue<Incoming, boost::lockfree::capacity<MAX_DEPTH>> mInbox;
Note how I opted for the much safe jthread
with stop tokens for coordinating thread exit:
std::jthread mThread;
std::stop_token mToken = mThread.get_stop_token();
void listen(std::stop_token token) {
while (!token.stop_requested()) {
try {
if constexpr (IsClient)
mInbox.push(get<0>(resQ.receive(token)));
else
mInbox.push(get<0>(reqQ.receive(token)));
if (mCallback)
mCallback();
} catch (std::exception const& ex) {
std::cerr << "Listen: " << ex.what() << std::endl;
}
}
}
The external operations look much simpler, like:
void wait() { mThread.join(); }
void close() {
mThread.request_stop();
if (std::this_thread::get_id() != mThread.get_id())
wait();
}
bool send(Request const& msg, Prio prio = 10) { return reqQ.send(msg, prio, mToken); }
bool send(Response const& msg, Prio prio = 10) { return resQ.send(msg, prio, mToken); }
std::optional<Incoming> consume() {
if (Incoming val; mInbox.consume_one([&val](Incoming res) { val = std::move(res); }))
return val;
return {};
}
void register_callback(callback_t cb) { mCallback = cb; }
bool haveMessage() const { return mInbox.read_available(); }
Let's define the above server to respond to Requests
by sending back the square-root of a and b/2:
void server() {
ServerIpc ipc(IPC_NAME);
auto handler = [&ipc] {
assert(ipc.haveMessage());
if (std::optional<Request> req = ipc.consume()) {
auto [a, b] = *req;
std::cout << "server received request a:" << a << " b:" << b << std::endl;
if (a == -42 && b == -42) {
std::cout << " -> server handling close request" << std::endl;
ipc.close();
} else {
// send response
ipc.send(Response{sqrt(a), b / 2});
}
}
};
ipc.register_callback(handler);
ipc.wait();
}
That's all. Note how we've added a mechanism to tell the server the client wants it to exit (because the server owns the resources). The client may look something like this:
void client() {
ClientIpc ipc(IPC_NAME);
auto handler = [&ipc] {
assert(ipc.haveMessage());
if (std::optional<Response> res = ipc.consume()) {
auto [a, b] = *res;
std::cout << "client received response a:" << a << " b:" << b << std::endl;
}
};
ipc.register_callback(handler);
for (int i = 0; i < 100; ++i) {
if (rand() % 30 == 0) // Flag set by other thread
ipc.send(Request{i, 2 * i}); // Send request
std::this_thread::sleep_for(10ms);
}
std::cout << "Client sending close command" << std::endl;
ipc.send(Request{-42, -42});
std::cout << "Closing" << std::endl;
ipc.close();
}
All it does is send some requests for ~10s and log the responses. Then it tells the server to quit, and closes. Only the server will remove the queues.
A simple main to switch client/server:
int main(int argc, char** argv) {
if (std::set<std::string_view>(argv + 1, argv + argc).contains("server"))
server();
else
client();
std::cout << "Bye" << std::endl;
}
File test.h
#pragma once
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <iostream>
#include <optional>
#include <thread>
using namespace std::chrono_literals;
template <typename Request, typename Response, bool IsClient> class Ipc {
private:
static constexpr uint8_t MAX_DEPTH = 5;
using callback_t = std::function<void()>;
callback_t mCallback;
using Prio = uint32_t;
template <typename T> struct Channel {
Channel(std::string name) : name_(std::move(name)) { //
assert(queue_.get_max_msg_size() == sizeof(T));
}
~Channel() {
if (!IsClient) {
std::cerr << "Server cleaning up " << name_ << std::endl;
Queue::remove(name_.c_str());
}
}
std::tuple<T, Prio> receive(std::stop_token token) {
size_t len = 0;
Prio prio = 0;
T msg{};
while (!token.stop_requested()) {
auto deadline = std::chrono::steady_clock::now() + 50ms;
if (queue_.timed_receive(&msg, sizeof(msg), len, prio, deadline)) {
assert(len == sizeof(T));
return {std::move(msg), prio};
}
}
throw std::runtime_error("stop requested");
}
bool send(T const& msg, Prio prio, std::stop_token token) {
while (!token.stop_requested()) {
auto deadline = std::chrono::steady_clock::now() + 50ms;
if (queue_.timed_send(&msg, sizeof(msg), prio, deadline))
return true;
}
return false;
}
private:
using Queue = boost::interprocess::message_queue;
std::string name_;
Queue queue_ = open();
Queue open() {
if constexpr (IsClient) {
return {boost::interprocess::open_only, name_.c_str()};
} else {
return {boost::interprocess::open_or_create, name_.c_str(), MAX_DEPTH, sizeof(T)};
}
}
};
Channel<Request> reqQ;
Channel<Response> resQ;
std::jthread mThread;
std::stop_token mToken = mThread.get_stop_token();
using Incoming = std::conditional_t<IsClient, Response, Request>;
boost::lockfree::spsc_queue<Incoming, boost::lockfree::capacity<MAX_DEPTH>> mInbox;
void listen(std::stop_token token) {
while (!token.stop_requested()) {
try {
if constexpr (IsClient)
mInbox.push(get<0>(resQ.receive(token)));
else
mInbox.push(get<0>(reqQ.receive(token)));
if (mCallback)
mCallback();
} catch (std::exception const& ex) {
std::cerr << "Listen: " << ex.what() << std::endl;
}
}
}
public:
Ipc(std::string const& queueName) try
: reqQ(queueName + "_req")
, resQ(queueName + "_res")
, mThread(bind(&Ipc::listen, this, std::placeholders::_1)) {
} catch (std::exception const& ex) {
std::cerr << "Ipc: " << ex.what() << std::endl;
throw;
}
void wait() { mThread.join(); }
void close() {
mThread.request_stop();
if (std::this_thread::get_id() != mThread.get_id())
wait();
}
bool send(Request const& msg, Prio prio = 10) { return reqQ.send(msg, prio, mToken); }
bool send(Response const& msg, Prio prio = 10) { return resQ.send(msg, prio, mToken); }
std::optional<Incoming> consume() {
if (Incoming val; mInbox.consume_one([&val](Incoming res) { val = std::move(res); }))
return val;
return {};
}
void register_callback(callback_t cb) { mCallback = cb; }
bool haveMessage() const { return mInbox.read_available(); }
};
File test.cpp
#include "test.h"
#include <cmath>
#include <set>
// Requests are sent by client to server.
// Responses are sent by server to client.
struct Request { int a, b; };
struct Response { double a; int b; };
using ClientIpc = Ipc<Request, Response, true>;
using ServerIpc = Ipc<Request, Response, false>;
static std::string IPC_NAME = "so_demo_ipc";
void server() {
ServerIpc ipc(IPC_NAME);
auto handler = [&ipc] {
assert(ipc.haveMessage());
if (std::optional<Request> req = ipc.consume()) {
auto [a, b] = *req;
std::cout << "server received request a:" << a << " b:" << b << std::endl;
if (a == -42 && b == -42) {
std::cout << " -> server handling close request" << std::endl;
ipc.close();
} else {
// send response
ipc.send(Response{sqrt(a), b / 2});
}
}
};
ipc.register_callback(handler);
ipc.wait();
}
void client() {
ClientIpc ipc(IPC_NAME);
auto handler = [&ipc] {
assert(ipc.haveMessage());
if (std::optional<Response> res = ipc.consume()) {
auto [a, b] = *res;
std::cout << "client received response a:" << a << " b:" << b << std::endl;
}
};
ipc.register_callback(handler);
for (int i = 0; i < 100; ++i) {
if (rand() % 30 == 0) // Flag set by other thread
ipc.send(Request{i, 2 * i}); // Send request
std::this_thread::sleep_for(10ms);
}
std::cout << "Client sending close command" << std::endl;
ipc.send(Request{-42, -42});
std::cout << "Closing" << std::endl;
ipc.close();
}
int main(int argc, char** argv) {
if (std::set<std::string_view>(argv + 1, argv + argc).contains("server"))
server();
else
client();
std::cout << "Bye" << std::endl;
}
With a live demo locally:
¹ sadly online compilers don't allow shared memory access
Upvotes: 1