Reputation: 705
I'm trying to write a handler class that subscribes to a message published via zeromq and buffers the last received message.
I tried doing this as follows. The method ReceivedMessage()
is to be called by a wrapper application in a cyclic called function. Once it returns true, I tried to access the message using GetReceivedMessageData()
. Unfortunately, it seems that the data is not saved properly in the member zmq_receivedMessage_
.
I guess this is because of zmq_receivedMessage_
being initialized with fixed size and the call zmq_subscriber_.recv(&zmq_receivedMessage_)
does not automatically resize it?
What would be the easiest and most robust way to this? The only way I can think of is using realloc()
and memcpy()
every time a new message is received. Or is there a simpler way?
#include <cstdint>
#include "zeromq_cpp/zmq.hpp"
class HandlerClass
{
public:
/// @brief Initializes a AirSimToRos class instance.
HandlerClass(std::string const& addr);
// @brief Gets the message data received via ZeroMq as pointer.
void* GetReceivedMessageData();
// @brief Gets the message size received via ZeroMq as size_t.
std::size_t GetReceivedMessageSize();
// @brief Returns true if a new, full message was received via ZeroMq, false otherwise
bool ReceivedMessage();
private:
/// @brief A ZeroMq context object encapsulating functionality dealing with the initialisation and termination.
zmq::context_t zmq_context_;
/// @brief A ZeroMq socket for subscribing to incoming messages.
zmq::socket_t zmq_subscriber_;
/// @brief A ZeroMq message that was received last. Might be empty if ReceivedMessage() never was true.
zmq::message_t zmq_receivedMessage_;
};
HandlerClass::HandlerClass(std::string const& addr)
: zmq_context_(1)
, zmq_subscriber_(zmq_context_, ZMQ_SUB)
{
zmq_subscriber_.setsockopt(ZMQ_IDENTITY, "HandlerSubscriber", 5);
zmq_subscriber_.setsockopt(ZMQ_SUBSCRIBE, "", 0);
zmq_subscriber_.setsockopt(ZMQ_RCVTIMEO, 5000);
zmq_subscriber_.connect(addr);
}
void* HandlerClass::GetReceivedMessageData()
{
return zmq_receivedMessage_.data();
}
std::size_t HandlerClass::GetReceivedMessageSize()
{
return zmq_receivedMessage_.size();
}
bool HandlerClass::ReceivedMessage()
{
int received_bytes = zmq_subscriber_.recv(&zmq_receivedMessage_);
return received_bytes > 0;
}
Upvotes: 1
Views: 345
Reputation: 1
Poller
-instance + ZMQ_CONFLATE
Having zero context of the intended class use-cases, the original design seems to be a rather "mechanical" wrapper of a data-mover, not any MVP-slim-design, that can squeeze maximum of the benefits the ZeroMQ Scalable Formal Communication Archetypes Signalling/Messaging framework has already built-in.
The much smarter ( and also a ZMQ_RCV_HWM
-safer ( goes beyond the scope of this topic ) ) would be not to just always mechanically read each message from the ZeroMQ Context
-domain of control, unless in a real need to re-transmit such data from the HandlerClass
somewhere further down the line.
Add a private
instance of the Poller
that would allow to redesign data-flow mechanics -- using non-destructive query, using a .poll()
-method for testing a new message arrival ( having also a Real-Time / Event-Handling Loop stability control tools, not to wait longer than an ad-hoc set .poll()
-method timeout ), while yet able to defer any actual data-move as late as possible, until a data indeed need to flow outside of the HandlerClass
-instance, not anywhere earlier.
HandlerClass::HandlerClass(std::string const& addr)
: zmq_context_(1)
, zmq_subscriber_(zmq_context_, ZMQ_SUB)
{
zmq_subscriber_.setsockopt( ZMQ_IDENTITY, "HandlerSubscriber", 5 );
zmq_subscriber_.connect( addr );
zmq_subscriber_.setsockopt( ZMQ_SUBSCRIBE, "", 0 );
zmq_subscriber_.setsockopt( ZMQ_LINGER, 0 ); // ALWAYS, READY 4 .term()
zmq_subscriber_.setsockopt( ZMQ_CONFLATE, 1 ); // SMART
zmq_subscriber_.setsockopt( ZMQ_TOS, T ); // WORTH DEPLOY & MANAGE
zmq_subscriber_.setsockopt( ZMQ_RCVTIMEO, 5000 );
// ------------------------------------------------- // ADD Poller-instance
...
// ------------------------------------------------- // RTO
}
Nota Bene: In case the exgress flow is also made on ZeroMQ infrastructure, there are time-saving API tools for Zero-Copy message re-marshalling into another ZeroMQ socket-transport -- ( almost ) for free -- cool, isn't it?
Upvotes: 1