Reputation: 3388
I have the following piece of c++ code that opens a ZMQ-subscriber-socket and receives messages in an infinite loop.
listener.cc
: (Code should work, compile with: g++ -lzmq listener.cc
)
#include <iostream>
#include <zmq.hpp>
class Listener {
public:
Listener() {
std::cout << "constructor call" << std::endl;
// Not working:
// zmq::context_t context(1);
// sck = new zmq::socket_t(context, ZMQ_SUB);
// sck->connect("tcp://127.0.0.1:9999");
// sck->setsockopt( ZMQ_SUBSCRIBE, "", 0);
std::cout << "constructor end" << std::endl;
}
void run() {
// Ok:
zmq::context_t context(1);
sck = new zmq::socket_t(context, ZMQ_SUB);
sck->connect("tcp://127.0.0.1:9999");
sck->setsockopt(ZMQ_SUBSCRIBE, "", 0);
while (1) { // Receive messages, not interesting:
std::cout << "listening..." << std::endl;
zmq::message_t message;
sck->recv(&message);
std::cout << "received something" << std::endl;
}
}
zmq::socket_t *sck;
};
int main(int argc, char *argv[]) {
Listener listener;
std::cout << "foo" << std::endl;
listener.run();
return 0;
}
So far, the code works as expected:
$ g++ -lzmq listener.cc
$ ./a.out
constructor call
constructor end
foo
listening...
However, I want to move the initialization of zmq-context/socket into the constructor of the class (the part that is commented out). But then the code simply won't return from the constructor call, all statements inside the constructor are executed but the second line of the main
is not executed and the programme gets stuck. The output is:
$ g++ -lzmq listener.cc
$ ./a.out
constructor call
constructor end
The only thing that comes to my mind is that the main thread stops execution for some reason. Can anyone explain this and provide a solution?
Cheers
Upvotes: 2
Views: 1815
Reputation: 1
ZeroMQ uses each Context( nIOthreads = 1 )
instance as a quite powerful engine under the hood and some care has to be taken, so that the resources management procedures do not get one surprised ( as the blocking / freezing is one such case ).
In case there are some actively used socket-instances ( managed under the hood of the Context()-instance ), there may be a situation, that not all transfers have been completed before one gets into a destructor-handling phase or if one makes a similar step manually, in an attempt to try to .close()
such socket-instance and/or .term()
the Context-instance.
People headbang, knowingly or not, into this quite many times.
ZeroMQ native API documentation is quite clear on this subject and warns about a risk, that not yet completed low-level transactions may let one's code wait infinitely for an external ( remote-agent operated ) event, that will never appear. Such un-aware code will look like a frozen / hanging failure, but was driven into such deterministic case just by one's failure to realise this risk and by not taking due pre-cautions.
While newer API versions have changed some default settings, I advocate all users to explicitly setup the safe configuration, even if newer defaults may avoid such a need of doing this manually. Yet, this practice helps to raise awareness of what sorts of collisions ought be thought of in due distributed-system design practice.
.setsockopt( ZMQ_LINGER, 0 );
zmq_term()
shall block until the following conditions are satisfied:All sockets open within context have been closed with
zmq_close()
.
For each socket within context, all messages sent by the application withzmq_send()
have either been physically transferred to a network peer, or the socket's linger period set with theZMQ_LINGER
socket option has expired.
As noted above, this is a rule of thumb step upon each and every socket-instantiation.
class Listener {
// zmq::context_t aClassLocalCONTEXT; // MAY GET SET LOCAL CTX BY VALUE
// zmq::socket_t aClassLocalSOCKET; // MAY GET SET LOCAL SCK BY VALUE EITHER
zmq::socket_t *sck;
public:
Listener() {
std::cout << "constructor call" << std::endl;
// zmq::context_t context(1); // not a best practice here
// ---------------------------------------------------
// sck = new zmq::socket_t( aClassLocalCONTEXT, ZMQ_SUB );
sck = new zmq::socket_t( context, ZMQ_SUB );
sck->setsockopt( ZMQ_LINGER, 0 ); // ALWAYS, best before .bind()/.connect()
sck->connect( "tcp://127.0.0.1:9999" );
sck->setsockopt( ZMQ_SUBSCRIBE, "", 0 );
// ----------------------------------------------// IF SETUP BY AN INSTANTIATION CALL INTERFACE
// aClassLocalSOCKET->setsockopt( ZMQ_LINGER, 0 );
// aClassLocalSOCKET->connect( ... );
// aClassLocalSOCKET->setsockopt( ZMQ_SUBSCRIBE, "", 0 );
// ---------------------------------------------------
std::cout << "constructor end" << std::endl;
}
~Listener() {
sck->close(); // A GOOD PRACTICE
// ----------------------------------------------// IF SETUP BY AN INSTANTIATION CALL INTERFACE
// aClassLocalSOCKET->close();
}
void run() {
while (1) { // recv()-messages, not interesting:
std::cout << "listening..." << std::endl;
zmq::message_t message;
sck->recv(&message);
std::cout << "received something" << std::endl;
zmq::zmq_msg_close(&message); // A GOOD PLACE TO DISCARD A NEVER MORE USED RESOURCE
}
}
};
int main(int argc, char *argv[]) {
zmq::context_t context(1); // GLOBAL CTX
Listener listener;
std::cout << "foo" << std::endl;
listener.run();
return 0;
}
Smart handling of resources is important, as each instantiation and destruction bears costs in both [TIME]
-domain and [SPACE]
-domain ( memory allocation / de-allocation costs, again in time ) and neither of these is cheap.
Plus one shall follow the ZeroMQ Zen of Zero - do not share anything ( well, sometimes sharing a Context()-instance is a way, but ... if you are serious into distributed-system design, best read a great Pieter HINTJENS' book "Code Connected: Volume 1", definitely worth one's time and efforts ).
Upvotes: 2
Reputation: 63745
Take the zmq::context_t context(1);
out of the constructor.
That should be initialized globally or someplace similar. Normally, you only need one of those.
You are freezing because you are trying to delete the zmq::context_t
that's local to your constructor while sockets still exist that are using it.
Upvotes: 2