Johannes
Johannes

Reputation: 3388

ZMQ causes main thread to freeze (or something similar..?)

I have the following piece of c++ code that opens a ZMQ-subscriber-socket and receives messages in an infinite loop.

listener.cc: (Code should work, compile with: g++ -lzmq listener.cc)

#include <iostream>
#include <zmq.hpp>

class Listener {
public:
    Listener() {
        std::cout << "constructor call" << std::endl;

        // Not working:
//        zmq::context_t context(1);
//        sck = new zmq::socket_t(context, ZMQ_SUB);
//        sck->connect("tcp://127.0.0.1:9999");
//        sck->setsockopt( ZMQ_SUBSCRIBE, "", 0);

        std::cout << "constructor end" << std::endl;
    }

    void run() {
        // Ok:
        zmq::context_t context(1);
        sck = new zmq::socket_t(context, ZMQ_SUB);
        sck->connect("tcp://127.0.0.1:9999");
        sck->setsockopt(ZMQ_SUBSCRIBE, "", 0);

        while (1) { // Receive messages, not interesting:
            std::cout << "listening..." << std::endl;
            zmq::message_t message;
            sck->recv(&message);
            std::cout << "received something" << std::endl;
        }
    }
    zmq::socket_t *sck;
};

int main(int argc, char *argv[]) {
    Listener listener;
    std::cout << "foo" << std::endl;
    listener.run();
    return 0;
}

So far, the code works as expected:

$ g++ -lzmq listener.cc
$ ./a.out 
constructor call
constructor end
foo
listening...

However, I want to move the initialization of zmq-context/socket into the constructor of the class (the part that is commented out). But then the code simply won't return from the constructor call, all statements inside the constructor are executed but the second line of the main is not executed and the programme gets stuck. The output is:

$ g++ -lzmq listener.cc
$ ./a.out 
constructor call
constructor end

The only thing that comes to my mind is that the main thread stops execution for some reason. Can anyone explain this and provide a solution?

Cheers

Upvotes: 2

Views: 1815

Answers (2)

user3666197
user3666197

Reputation: 1

Can anyone explain this and provide a solution? Yes to both ...

ZeroMQ uses each Context( nIOthreads = 1 ) instance as a quite powerful engine under the hood and some care has to be taken, so that the resources management procedures do not get one surprised ( as the blocking / freezing is one such case ).

In case there are some actively used socket-instances ( managed under the hood of the Context()-instance ), there may be a situation, that not all transfers have been completed before one gets into a destructor-handling phase or if one makes a similar step manually, in an attempt to try to .close() such socket-instance and/or .term() the Context-instance.

People headbang, knowingly or not, into this quite many times.

ZeroMQ native API documentation is quite clear on this subject and warns about a risk, that not yet completed low-level transactions may let one's code wait infinitely for an external ( remote-agent operated ) event, that will never appear. Such un-aware code will look like a frozen / hanging failure, but was driven into such deterministic case just by one's failure to realise this risk and by not taking due pre-cautions.

While newer API versions have changed some default settings, I advocate all users to explicitly setup the safe configuration, even if newer defaults may avoid such a need of doing this manually. Yet, this practice helps to raise awareness of what sorts of collisions ought be thought of in due design practice.


Solution? Always .setsockopt( ZMQ_LINGER, 0 );

zmq_term() shall block until the following conditions are satisfied:

All sockets open within context have been closed with zmq_close().
For each socket within context, all messages sent by the application with zmq_send() have either been physically transferred to a network peer, or the socket's linger period set with the ZMQ_LINGER socket option has expired.

As noted above, this is a rule of thumb step upon each and every socket-instantiation.

class Listener {
   // zmq::context_t aClassLocalCONTEXT;              // MAY GET SET LOCAL CTX BY VALUE
   // zmq::socket_t  aClassLocalSOCKET;               // MAY GET SET LOCAL SCK BY VALUE EITHER
      zmq::socket_t  *sck;

public:
    Listener() {
        std::cout << "constructor call" << std::endl;

     // zmq::context_t context(1);                    // not a best practice here
     // ---------------------------------------------------
     // sck = new zmq::socket_t( aClassLocalCONTEXT, ZMQ_SUB );
        sck = new zmq::socket_t( context, ZMQ_SUB );
        sck->setsockopt( ZMQ_LINGER, 0 );             // ALWAYS, best before .bind()/.connect()
        sck->connect(   "tcp://127.0.0.1:9999" );
        sck->setsockopt( ZMQ_SUBSCRIBE, "", 0 );
     // ----------------------------------------------// IF SETUP BY AN INSTANTIATION CALL INTERFACE
     // aClassLocalSOCKET->setsockopt( ZMQ_LINGER, 0 );
     // aClassLocalSOCKET->connect(    ... );
     // aClassLocalSOCKET->setsockopt( ZMQ_SUBSCRIBE, "", 0 );
     // ---------------------------------------------------
        std::cout << "constructor end" << std::endl;
    }
   ~Listener() {
        sck->close();                                 // A GOOD PRACTICE
     // ----------------------------------------------// IF SETUP BY AN INSTANTIATION CALL INTERFACE
     // aClassLocalSOCKET->close();
    }
    void run() {            
        while (1) {                                   // recv()-messages, not interesting:
            std::cout << "listening..." << std::endl;
            zmq::message_t message;
            sck->recv(&message);
            std::cout << "received something" << std::endl;
            zmq::zmq_msg_close(&message);             // A GOOD PLACE TO DISCARD A NEVER MORE USED RESOURCE
        }
    }
};


int main(int argc, char *argv[]) {
    zmq::context_t context(1);                        // GLOBAL CTX
    Listener listener;
    std::cout << "foo" << std::endl;
    listener.run();
    return 0;
}

Efficient resources reduces overhead costs

Smart handling of resources is important, as each instantiation and destruction bears costs in both [TIME]-domain and [SPACE]-domain ( memory allocation / de-allocation costs, again in time ) and neither of these is cheap.

Plus one shall follow the ZeroMQ Zen of Zero - do not share anything ( well, sometimes sharing a Context()-instance is a way, but ... if you are serious into design, best read a great Pieter HINTJENS' book "Code Connected: Volume 1", definitely worth one's time and efforts ).

Upvotes: 2

Drew Dormann
Drew Dormann

Reputation: 63745

Take the zmq::context_t context(1); out of the constructor.

That should be initialized globally or someplace similar. Normally, you only need one of those.

You are freezing because you are trying to delete the zmq::context_t that's local to your constructor while sockets still exist that are using it.

Upvotes: 2

Related Questions