Reputation: 1947
When I'm trying to check if new message is available in message queue, I'm getting core dump error.
I'm calling isDataAvailableInReceiveDataQueue
method to check if data is available in message queue.
I'm using mq_getattr
function to read number available of messages queue and checking the mq_curmsgs
variable in struct mq_attr
structure.
If attr.mq_curmsgs>0
; I treat new data is available.
I'm getting core dump error on return true.
OS: vxWorks 7
C++17
Code :
ipc.hpp
#pragma once
#include <queue>
#include <mutex>
#include <string>
#include <optional>
#include <iostream>
#include <functional> //For callback
//#include "common.hpp"
#include <signal.h> //Definition of SIGEV_* constants
#include <fcntl.h> //For O_RDWR, O_RDONLY, O_WRONLY constants
#include <mqueue.h> //For mode constants
#include <sys/stat.h> //For POSIX Message Queue
template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
class Ipc
{
private:
using callback_t = std::function<void(void)>;
callback_t mCallback;
//Message Queue file descriptor
mqd_t mPrimaryToSecondaryMessageQueue {-1};
mqd_t mSecondaryToPrimaryMessageQueue {-1};
std::string mPrimaryToSecondaryMessageQueueName;
std::string mSecondaryToPrimaryMessageQueueName;
bool listen(void);
static void handler(union sigval sv);
bool open(bool noWaitDelay);
public:
Ipc() {}
bool open(const std::string& queueName, bool noWaitDelay = true);
bool close(void);
bool send(const T1& data, std::uint32_t priority = 10);
std::optional<T2> receive(void);
bool register_callback(callback_t callback_implementation);
bool isDataAvailableInReceiveDataQueue(void) const;
std::optional<std::uint32_t> getMessagesCountCurrentlyInQueue(void) const;
std::optional<std::uint32_t> getMessagesCountCurrentlyInLocalBuffer(void) const;
};
template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::listen(void)
{
int returnStatus;
struct sigevent sev;
sev.sigev_notify = SIGEV_THREAD;
sev.sigev_notify_function = Ipc::handler;
sev.sigev_notify_attributes = nullptr;
sev.sigev_value.sival_ptr = this; // Arg. to thread function
if(primaryNode == true)
{
returnStatus = mq_notify(this->mSecondaryToPrimaryMessageQueue, &sev); //Listen on secondary to primary queue
}
else
{
returnStatus = mq_notify(this->mPrimaryToSecondaryMessageQueue, &sev); //Listen on primary to secondary queue
}
if(returnStatus == -1)
{
std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
return false;
}
return true;
}
template <typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline void Ipc<T1, T2, primaryNode, max_message_depth>::handler(sigval si)
{
try
{
mqd_t mqFd;
struct mq_attr attr;
Ipc *ipc = reinterpret_cast<Ipc *>(si.sival_ptr);
ipc->listen();
ipc->mCallback();
}
catch(const std::exception& e)
{
//Reinitialize Module
std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << e.what() << std::endl;
// return;
}
}
template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::open(const std::string& queueName, bool noWaitDelay)
{
try
{
if(primaryNode == true)
{
this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
}
else
{
this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
}
return this->open(noWaitDelay);
}
catch(const std::exception& e)
{
//Reinitialize module
std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error Number : " << errno << " Error : " << errorToString(errno) << std::endl;
return false;
}
}
template <typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::open(bool noWaitDelay)
{
try
{
//int returnStatus = -1;
int primaryToSecondaryFlags = 0;
int secondaryToPrimaryFlags = 0;
struct mq_attr primaryToSecondaryAttr = {0};
struct mq_attr secondaryToPrimaryAttr = {0};
if(primaryNode == true) //Data sent from primary to secondary
{
primaryToSecondaryFlags = O_CREAT | O_WRONLY/* | O_NONBLOCK*/; //Primary node will only send data to secondary node - Non-Blocking
secondaryToPrimaryFlags = O_CREAT | O_RDONLY/* | O_NONBLOCK*/; //Secondary node will only receive data from primary node - Non-Blocking
if(noWaitDelay == true)
{
primaryToSecondaryFlags |= O_NONBLOCK;
secondaryToPrimaryFlags |= O_NONBLOCK;
}
primaryToSecondaryAttr.mq_flags = 0; //ignored for mq_open()
primaryToSecondaryAttr.mq_maxmsg = max_message_depth; //maximum number of messages possible in queue
primaryToSecondaryAttr.mq_msgsize = sizeof(T1); //maximum message size in bytes to be sent
primaryToSecondaryAttr.mq_curmsgs = 0; //ignored for mq_open()
secondaryToPrimaryAttr.mq_flags = 0; //ignored for mq_open()
secondaryToPrimaryAttr.mq_maxmsg = max_message_depth; //maximum number of messages possible in queue
secondaryToPrimaryAttr.mq_msgsize = sizeof(T2); //maximum message size in bytes to be received
secondaryToPrimaryAttr.mq_curmsgs = 0; //ignored for mq_open()
}
else //Data sent from Secondary to Primary node
{
primaryToSecondaryFlags = O_CREAT | O_RDONLY/* | O_NONBLOCK*/; //Primary node will only receive data from secondary node
secondaryToPrimaryFlags = O_CREAT | O_WRONLY/* | O_NONBLOCK*/; //Secondary node will only send data to primary node
if(noWaitDelay == true)
{
primaryToSecondaryFlags |= O_NONBLOCK;
secondaryToPrimaryFlags |= O_NONBLOCK;
}
primaryToSecondaryAttr.mq_flags = 0; //ignored for mq_open()
primaryToSecondaryAttr.mq_maxmsg = max_message_depth; //maximum number of messages possible in queue
primaryToSecondaryAttr.mq_msgsize = sizeof(T2); //maximum message size in bytes to be received
primaryToSecondaryAttr.mq_curmsgs = 0; //ignored for mq_open()
secondaryToPrimaryAttr.mq_flags = 0; //ignored for mq_open()
secondaryToPrimaryAttr.mq_maxmsg = max_message_depth; //maximum number of messages possible in queue
secondaryToPrimaryAttr.mq_msgsize = sizeof(T1); //maximum message size in bytes to be sent
secondaryToPrimaryAttr.mq_curmsgs = 0; //ignored for mq_open()
}
//Initialize Primary to Secondary Node
this->mPrimaryToSecondaryMessageQueue = mq_open(
this->mPrimaryToSecondaryMessageQueueName.c_str(),
primaryToSecondaryFlags,
S_IRWXU | S_IRWXG | S_IRWXO, // user (file owner), group and others have read, write, and execute permission
&primaryToSecondaryAttr
);
if(this->mPrimaryToSecondaryMessageQueue == static_cast<mqd_t>(-1))
{
std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error Number : " << errno << " Error : " << errorToString(errno) << std::endl;
return false;
}
//Initialize Secondary to Primary Node
this->mSecondaryToPrimaryMessageQueue = mq_open(
this->mSecondaryToPrimaryMessageQueueName.c_str(),
secondaryToPrimaryFlags,
S_IRWXU | S_IRWXG | S_IRWXO,// user (file owner), group and others have read, write, and execute permission
&secondaryToPrimaryAttr
);
if(this->mSecondaryToPrimaryMessageQueue == static_cast<mqd_t>(-1))
{
std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
return false;
}
return this->listen();
}
catch(const std::exception& e)
{
std::cerr << e.what() << '\n';
return false;
}
}
template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::close(void)
{
try
{
int returnStatus = mq_close(this->mPrimaryToSecondaryMessageQueue);
if(returnStatus == -1)
{
std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
return false;
}
returnStatus = mq_close(this->mSecondaryToPrimaryMessageQueue);
if(returnStatus == -1)
{
std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
return false;
}
return true;
}
catch (const std::exception& e)
{
std::cout << e.what() << std::endl;
return false;
}
}
template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::send(const T1& data, std::uint32_t priority)
{
mqd_t mqFd;
if (primaryNode == true) //Send message on Primary to Secondary Queue
{
mqFd = this->mPrimaryToSecondaryMessageQueue;
}
else //Send message on Secondary to Primary Queue
{
mqFd = this->mSecondaryToPrimaryMessageQueue;
}
int returnStatus = mq_send(
mqFd,
reinterpret_cast<const char *>(&data),
sizeof(data),
priority
);
if (returnStatus == -1)
{
std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
return false;
}
//std::cout<<"\nSent Message without error\n";
return true;
}
template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline std::optional<T2> Ipc<T1, T2, primaryNode, max_message_depth>::receive(void)
{
mqd_t mqFd;
T2 ipc_receive;
if(primaryNode == true) //Select secondaryToPrimaryQueue queue to receieve data in primary mode
{
mqFd = this->mSecondaryToPrimaryMessageQueue;
}
else //Select primaryToSecondary queue to receive data in non-primary mode (secondary mode)
{
mqFd = this->mPrimaryToSecondaryMessageQueue;
}
ssize_t receiveLength = mq_receive(
mqFd,
reinterpret_cast<char *>(&ipc_receive),
sizeof(ipc_receive),
nullptr
);
if(receiveLength == -1)
{
std::cerr << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
return std::nullopt;
}
return ipc_receive;
}
template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::register_callback(callback_t callbackImplementation)
{
try
{
this->mCallback = callbackImplementation;
return true;
}
catch (const std::exception& e)
{
std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
}
return false;
}
template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::isDataAvailableInReceiveDataQueue(void) const
{
mqd_t mqFd;
struct mq_attr attr;
if(primaryNode == true) //Select secondaryToPrimaryQueue queue to receieve data in primary mode
{
mqFd = this->mSecondaryToPrimaryMessageQueue;
}
else //Select primaryToSecondary queue to receive data in non-primary mode (secondary mode)
{
mqFd = this->mPrimaryToSecondaryMessageQueue;
}
if (mq_getattr(mqFd, &attr) == -1)
{
std::cerr << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
return false;
}
if (attr.mq_curmsgs > 0) //If 1 or more message are available
{
return true; //Core dump after executing this statement
}
else
{
return false;
}
}
template <typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline std::optional<std::uint32_t> Ipc<T1, T2, primaryNode, max_message_depth>::getMessagesCountCurrentlyInQueue(void) const
{
struct mq_attr attribute;
mqd_t mqFd;
if(primaryNode == true) //Select secondaryToPrimaryQueue queue to receieve data in primary mode
{
mqFd = this->mSecondaryToPrimaryMessageQueue;
}
else //Select primaryToSecondary queue to receive data in non-primary mode (secondary mode)
{
mqFd = this->mPrimaryToSecondaryMessageQueue;
}
if (mq_getattr(mqFd, &attribute) == -1)
{
perror("mq_getattr");
//Log Error
return std::nullopt;
}
return attribute.mq_curmsgs;
}
main.cpp
#include <thread>
#include <iostream>
#include "ipc.hpp"
typedef struct ipc_data
{
int a;
}ipc_data_t;
Ipc<ipc_data_t,ipc_data_t,true,100>mq;
void thread_runner(void)
{
while (true)
{
if (mq.isDataAvailableInReceiveDataQueue() == true) //Core Dump on executing this step
{
std::optional<ipc_data_t> ipc_optional_data = mq.receive();
//Process further data
}
}
}
void callback(void)
{
//Singal Semaphore or Event
}
int main(int argc, char const *argv[])
{
mq.register_callback(callback);
bool openMessageQueueStatus = mq.open("mq1");
if (openMessageQueueStatus == true)
{
returnStatusInitialize = false;
}
else
{
//Log Error
returnStatusInitialize = false;
}
std::thread t {thread_runner};
while (true)
{
//Application Workflow
}
return 0;
}
Upvotes: 0
Views: 75