EliteTUM
EliteTUM

Reputation: 705

Buffer last received ZeroMQ message as class member

I'm trying to write a handler class that subscribes to a message published via zeromq and buffers the last received message.

I tried doing this as follows. The method ReceivedMessage() is to be called by a wrapper application in a cyclic called function. Once it returns true, I tried to access the message using GetReceivedMessageData(). Unfortunately, it seems that the data is not saved properly in the member zmq_receivedMessage_.

I guess this is because of zmq_receivedMessage_ being initialized with fixed size and the call zmq_subscriber_.recv(&zmq_receivedMessage_) does not automatically resize it?

What would be the easiest and most robust way to this? The only way I can think of is using realloc() and memcpy() every time a new message is received. Or is there a simpler way?

#include <cstdint>
#include "zeromq_cpp/zmq.hpp"

class HandlerClass
{
public:
    /// @brief Initializes a AirSimToRos class instance.
    HandlerClass(std::string const& addr);

    // @brief Gets the message data received via ZeroMq as pointer.
    void* GetReceivedMessageData();

    // @brief Gets the message size received via ZeroMq as size_t.
    std::size_t GetReceivedMessageSize();

    // @brief Returns true if a new, full message was received via ZeroMq, false otherwise
    bool ReceivedMessage();

private:    
    /// @brief A ZeroMq context object encapsulating functionality dealing with the initialisation and termination.
    zmq::context_t zmq_context_;

    /// @brief A ZeroMq socket for subscribing to incoming messages.
    zmq::socket_t zmq_subscriber_;

    /// @brief A ZeroMq message that was received last. Might be empty if ReceivedMessage() never was true.
    zmq::message_t zmq_receivedMessage_; 

};

HandlerClass::HandlerClass(std::string const& addr)
    : zmq_context_(1)
    , zmq_subscriber_(zmq_context_, ZMQ_SUB)
{
    zmq_subscriber_.setsockopt(ZMQ_IDENTITY, "HandlerSubscriber", 5);
    zmq_subscriber_.setsockopt(ZMQ_SUBSCRIBE, "", 0);
    zmq_subscriber_.setsockopt(ZMQ_RCVTIMEO, 5000);
    zmq_subscriber_.connect(addr);
}

void* HandlerClass::GetReceivedMessageData()
{
    return zmq_receivedMessage_.data();
}

std::size_t HandlerClass::GetReceivedMessageSize()
{
    return zmq_receivedMessage_.size();
}

bool HandlerClass::ReceivedMessage()
{    
    int received_bytes = zmq_subscriber_.recv(&zmq_receivedMessage_);

    return received_bytes > 0;
}

Upvotes: 1

Views: 345

Answers (1)

user3666197
user3666197

Reputation: 1

One way would be a redesign w/ Poller-instance + ZMQ_CONFLATE

Having zero context of the intended class use-cases, the original design seems to be a rather "mechanical" wrapper of a data-mover, not any MVP-slim-design, that can squeeze maximum of the benefits the ZeroMQ Scalable Formal Communication Archetypes Signalling/Messaging framework has already built-in.

The much smarter ( and also a ZMQ_RCV_HWM-safer ( goes beyond the scope of this topic ) ) would be not to just always mechanically read each message from the ZeroMQ Context-domain of control, unless in a real need to re-transmit such data from the HandlerClass somewhere further down the line.

Add a private instance of the Poller that would allow to redesign data-flow mechanics -- using non-destructive query, using a .poll()-method for testing a new message arrival ( having also a Real-Time / Event-Handling Loop stability control tools, not to wait longer than an ad-hoc set .poll()-method timeout ), while yet able to defer any actual data-move as late as possible, until a data indeed need to flow outside of the HandlerClass-instance, not anywhere earlier.

HandlerClass::HandlerClass(std::string const& addr)
    : zmq_context_(1)
    , zmq_subscriber_(zmq_context_, ZMQ_SUB)
{
    zmq_subscriber_.setsockopt( ZMQ_IDENTITY,   "HandlerSubscriber", 5 );
    zmq_subscriber_.connect(                     addr );
    zmq_subscriber_.setsockopt( ZMQ_SUBSCRIBE,  "", 0 );
    zmq_subscriber_.setsockopt( ZMQ_LINGER,      0 );  // ALWAYS, READY 4 .term()
    zmq_subscriber_.setsockopt( ZMQ_CONFLATE,    1 );  // SMART
    zmq_subscriber_.setsockopt( ZMQ_TOS,         T );  // WORTH DEPLOY & MANAGE
    zmq_subscriber_.setsockopt( ZMQ_RCVTIMEO, 5000 );
 // -------------------------------------------------  // ADD Poller-instance
    ...
 // -------------------------------------------------  // RTO
}

Nota Bene: In case the exgress flow is also made on ZeroMQ infrastructure, there are time-saving API tools for Zero-Copy message re-marshalling into another ZeroMQ socket-transport -- ( almost ) for free -- cool, isn't it?

Upvotes: 1

Related Questions