Dark Sorrow
Dark Sorrow

Reputation: 1947

Message Queue causing core dump when checking is message available

When I'm trying to check if new message is available in message queue, I'm getting core dump error.
I'm calling isDataAvailableInReceiveDataQueue method to check if data is available in message queue.
I'm using mq_getattr function to read number available of messages queue and checking the mq_curmsgs variable in struct mq_attr structure.
If attr.mq_curmsgs>0; I treat new data is available.
I'm getting core dump error on return true.

OS: vxWorks 7
C++17

Code :

ipc.hpp

#pragma once

#include <queue>
#include <mutex>
#include <string>
#include <optional>
#include <iostream>
#include <functional> //For callback
//#include "common.hpp"
#include <signal.h> //Definition of SIGEV_* constants
#include <fcntl.h> //For O_RDWR, O_RDONLY, O_WRONLY constants
#include <mqueue.h> //For mode constants
#include <sys/stat.h> //For POSIX Message Queue

template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
class Ipc
{
private:
    using callback_t = std::function<void(void)>;
    callback_t mCallback;
    
    //Message Queue file descriptor
    mqd_t                               mPrimaryToSecondaryMessageQueue {-1};
    mqd_t                               mSecondaryToPrimaryMessageQueue {-1};
    std::string                         mPrimaryToSecondaryMessageQueueName;
    std::string                         mSecondaryToPrimaryMessageQueueName;
    bool                                listen(void);
    static void                         handler(union sigval sv);
    bool                                open(bool noWaitDelay);
public:
    Ipc() {}
    bool                                open(const std::string& queueName, bool noWaitDelay = true);
    bool                                close(void);
    bool                                send(const T1& data, std::uint32_t priority = 10);
    std::optional<T2>                   receive(void);
    bool                                register_callback(callback_t callback_implementation);
    bool                                isDataAvailableInReceiveDataQueue(void) const;
    std::optional<std::uint32_t>        getMessagesCountCurrentlyInQueue(void) const;
    std::optional<std::uint32_t>        getMessagesCountCurrentlyInLocalBuffer(void) const;
};


template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::listen(void)
{
    int returnStatus;
    struct sigevent sev;
    sev.sigev_notify = SIGEV_THREAD;
    sev.sigev_notify_function = Ipc::handler;
    sev.sigev_notify_attributes = nullptr;
    sev.sigev_value.sival_ptr = this;   // Arg. to thread function
    if(primaryNode == true)
    {
        returnStatus = mq_notify(this->mSecondaryToPrimaryMessageQueue, &sev); //Listen on secondary to primary queue
    }
    else
    {
        returnStatus = mq_notify(this->mPrimaryToSecondaryMessageQueue, &sev); //Listen on primary to secondary queue
    }
    if(returnStatus == -1)
    {
        std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
        return false;
    }
    return true;
}

template <typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline void Ipc<T1, T2, primaryNode, max_message_depth>::handler(sigval si)
{
    try
    {
        mqd_t           mqFd;
        struct mq_attr  attr;
        
        Ipc *ipc = reinterpret_cast<Ipc *>(si.sival_ptr);

        ipc->listen();
        
        ipc->mCallback();
    }
    catch(const std::exception& e)
    {
        //Reinitialize Module
        std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << e.what() << std::endl;
        // return;
    }
}

template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::open(const std::string& queueName, bool noWaitDelay)
{
    try
    {
        if(primaryNode == true)
        {
            this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
            this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
        }
        else
        {
            this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
            this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
        }
        return this->open(noWaitDelay);
    }
    catch(const std::exception& e)
    {
        //Reinitialize module
        std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error Number : "  << errno << " Error : " << errorToString(errno) << std::endl;
        return false;
    }
}

template <typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::open(bool noWaitDelay)
{
    try
    {
        //int returnStatus = -1;
        int primaryToSecondaryFlags = 0;
        int secondaryToPrimaryFlags = 0;
        struct mq_attr primaryToSecondaryAttr = {0};
        struct mq_attr secondaryToPrimaryAttr = {0};
        if(primaryNode == true) //Data sent from primary to secondary
        {
            primaryToSecondaryFlags = O_CREAT | O_WRONLY/* | O_NONBLOCK*/; //Primary node will only send data to secondary node - Non-Blocking
            secondaryToPrimaryFlags = O_CREAT | O_RDONLY/* | O_NONBLOCK*/; //Secondary node will only receive data from primary node - Non-Blocking

            if(noWaitDelay == true)
            {
                primaryToSecondaryFlags |= O_NONBLOCK;
                secondaryToPrimaryFlags |= O_NONBLOCK;
            }

            primaryToSecondaryAttr.mq_flags = 0; //ignored for mq_open()
            primaryToSecondaryAttr.mq_maxmsg = max_message_depth; //maximum number of messages possible in queue
            primaryToSecondaryAttr.mq_msgsize = sizeof(T1); //maximum message size in bytes to be sent
            primaryToSecondaryAttr.mq_curmsgs = 0; //ignored for mq_open()
            
            secondaryToPrimaryAttr.mq_flags = 0; //ignored for mq_open()
            secondaryToPrimaryAttr.mq_maxmsg = max_message_depth; //maximum number of messages possible in queue
            secondaryToPrimaryAttr.mq_msgsize = sizeof(T2); //maximum message size in bytes to be received
            secondaryToPrimaryAttr.mq_curmsgs = 0; //ignored for mq_open()
        }
        else //Data sent from Secondary to Primary node
        {
            primaryToSecondaryFlags = O_CREAT | O_RDONLY/* | O_NONBLOCK*/; //Primary node will only receive data from secondary node
            secondaryToPrimaryFlags = O_CREAT | O_WRONLY/* | O_NONBLOCK*/; //Secondary node will only send data to primary node
            if(noWaitDelay == true)
            {
                primaryToSecondaryFlags |= O_NONBLOCK;
                secondaryToPrimaryFlags |= O_NONBLOCK;
            }

            primaryToSecondaryAttr.mq_flags = 0; //ignored for mq_open()
            primaryToSecondaryAttr.mq_maxmsg = max_message_depth; //maximum number of messages possible in queue
            primaryToSecondaryAttr.mq_msgsize = sizeof(T2); //maximum message size in bytes to be received
            primaryToSecondaryAttr.mq_curmsgs = 0; //ignored for mq_open()
            
            secondaryToPrimaryAttr.mq_flags = 0; //ignored for mq_open()
            secondaryToPrimaryAttr.mq_maxmsg = max_message_depth; //maximum number of messages possible in queue
            secondaryToPrimaryAttr.mq_msgsize = sizeof(T1); //maximum message size in bytes to be sent
            secondaryToPrimaryAttr.mq_curmsgs = 0; //ignored for mq_open()
        }
        
        //Initialize Primary to Secondary Node
        this->mPrimaryToSecondaryMessageQueue = mq_open(
            this->mPrimaryToSecondaryMessageQueueName.c_str(), 
            primaryToSecondaryFlags,
            S_IRWXU | S_IRWXG | S_IRWXO, // user (file owner), group and others have read, write, and execute permission
            &primaryToSecondaryAttr
        );
        if(this->mPrimaryToSecondaryMessageQueue == static_cast<mqd_t>(-1))
        {
            std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error Number : "  << errno << " Error : " << errorToString(errno) << std::endl;
            return false;
        }

        //Initialize Secondary to Primary Node
        this->mSecondaryToPrimaryMessageQueue = mq_open(
            this->mSecondaryToPrimaryMessageQueueName.c_str(), 
            secondaryToPrimaryFlags,
            S_IRWXU | S_IRWXG | S_IRWXO,// user (file owner), group and others have read, write, and execute permission
            &secondaryToPrimaryAttr
        );
        if(this->mSecondaryToPrimaryMessageQueue == static_cast<mqd_t>(-1))
        {
            std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
            return false;
        }

        return this->listen();
    }
    catch(const std::exception& e)
    {
        std::cerr << e.what() << '\n';
        return false;
    }
}

template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::close(void)
{
    try
    {
        int returnStatus = mq_close(this->mPrimaryToSecondaryMessageQueue);
        if(returnStatus == -1)
        {
            std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
            return false;
        }

        returnStatus = mq_close(this->mSecondaryToPrimaryMessageQueue);
        if(returnStatus == -1)
        {
            std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
            return false;
        }
        return true;
    }
    catch (const std::exception& e)
    {
        std::cout << e.what() << std::endl;
        return false;
    }
}

template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::send(const T1& data, std::uint32_t priority)
{
    mqd_t mqFd;
    if (primaryNode == true) //Send message on Primary to Secondary Queue
    {
        mqFd = this->mPrimaryToSecondaryMessageQueue;
    }
    else //Send message on Secondary to Primary Queue
    {
        mqFd = this->mSecondaryToPrimaryMessageQueue;
    }
    int returnStatus = mq_send(
            mqFd,
            reinterpret_cast<const char *>(&data),
            sizeof(data),
            priority
        );
    if (returnStatus == -1)
    {
        std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
        return false;
    }

    //std::cout<<"\nSent Message without error\n";
    return true;
}

template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline std::optional<T2> Ipc<T1, T2, primaryNode, max_message_depth>::receive(void)
{
    mqd_t   mqFd;
    T2      ipc_receive;
    if(primaryNode == true) //Select secondaryToPrimaryQueue queue to receieve data in primary mode
    {
        mqFd = this->mSecondaryToPrimaryMessageQueue;
    }
    else  //Select primaryToSecondary queue to receive data in non-primary mode (secondary mode)
    {
        mqFd = this->mPrimaryToSecondaryMessageQueue;
    }
    ssize_t receiveLength = mq_receive(
        mqFd,
        reinterpret_cast<char *>(&ipc_receive), 
        sizeof(ipc_receive),
        nullptr
    );
    if(receiveLength == -1)
    {
        std::cerr << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
        return std::nullopt;
    }
    return ipc_receive;
}

template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::register_callback(callback_t callbackImplementation)
{
    try
    {
        this->mCallback = callbackImplementation;
        return true;
    }
    catch (const std::exception& e)
    {
        std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
    }
    return false;
}

template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::isDataAvailableInReceiveDataQueue(void) const
{
    mqd_t           mqFd;
    struct mq_attr  attr;
    if(primaryNode == true) //Select secondaryToPrimaryQueue queue to receieve data in primary mode
    {
        mqFd = this->mSecondaryToPrimaryMessageQueue;
    }
    else  //Select primaryToSecondary queue to receive data in non-primary mode (secondary mode)
    {
        mqFd = this->mPrimaryToSecondaryMessageQueue;
    }
    if (mq_getattr(mqFd, &attr) == -1)
    {
        std::cerr << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
        return false;
    }

    if (attr.mq_curmsgs > 0) //If 1 or more message are available
    {
        return true; //Core dump after executing this statement
    }
    else
    {
        return false;
    }
}

template <typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline std::optional<std::uint32_t> Ipc<T1, T2, primaryNode, max_message_depth>::getMessagesCountCurrentlyInQueue(void) const
{
    struct mq_attr attribute;
    mqd_t           mqFd;
    if(primaryNode == true) //Select secondaryToPrimaryQueue queue to receieve data in primary mode
    {
        mqFd = this->mSecondaryToPrimaryMessageQueue;
    }
    else  //Select primaryToSecondary queue to receive data in non-primary mode (secondary mode)
    {
        mqFd = this->mPrimaryToSecondaryMessageQueue;
    }
    if (mq_getattr(mqFd, &attribute) == -1)
    {
        perror("mq_getattr");
        //Log Error
        return std::nullopt;
    }
    return attribute.mq_curmsgs;
}

main.cpp

#include <thread>
#include <iostream>
#include "ipc.hpp"

typedef struct ipc_data
{
    int a;
}ipc_data_t;

Ipc<ipc_data_t,ipc_data_t,true,100>mq;

void thread_runner(void)
{
    while (true)
    {
        if (mq.isDataAvailableInReceiveDataQueue() == true) //Core Dump on executing this step
        {
            std::optional<ipc_data_t> ipc_optional_data = mq.receive();
            //Process further data
        }
    }
}

void callback(void)
{
    //Singal Semaphore or Event
}

int main(int argc, char const *argv[])
{
    mq.register_callback(callback);
    bool openMessageQueueStatus = mq.open("mq1");
    if (openMessageQueueStatus == true)
    {
        returnStatusInitialize = false;
    }
    else
    {
        //Log Error
        returnStatusInitialize = false;
    }   
    std::thread t {thread_runner};
    while (true)
    {
       //Application Workflow
    }
    
    return 0;
}

Upvotes: 0

Views: 75

Answers (0)

Related Questions