shabby
shabby

Reputation: 3222

ZeroMq PUB/SUB pattern not working properly

My Requirements:

  1. High throughput, atleast 5000 messages per second
  2. Order of delivery not important
  3. Publisher, as obvious, should not wait for a response and should not care if a Subscriber is listening or not

Background:

I am creating a new thread for every message because if I dont, the messages generation part will out-speed the sending thread and messages get lost, so a thread for each message seems to be the right approach

Problem:

The problem is that somehow the threads that are started to send out the zMQ message are not being terminated (not exiting/finishing). There seems to be a problem in the following line:

s_send(*client, request.str());

because if I remove it then the threads terminate fine, so probably its this line which is causing problems, my first guess was that the thread is waiting for a response, but does a zmq_PUB wait for a response?

Here is my code:

void *SendHello(void *threadid) {
    long tid;
    tid = (long) threadid;
    //cout << "Hello World! Thread ID, " << tid << endl;
    std::stringstream request;
    //writing the hex as request to be sent to the server
    request << tid;
    s_send(*client, request.str());
    pthread_exit(NULL);
}

int main() {
    int sequence = 0;

    int NUM_THREADS = 1000;
    while (1) {
        pthread_t threads[NUM_THREADS];
        int rc;
        int i;
        for (i = 0; i < NUM_THREADS; i++) {
            cout << "main() : creating thread, " << i << endl;
            rc = pthread_create(&threads[i], NULL, SendHello, (void *) i);
        pthread_detach(threads[i]);
        sched_yield();

            if (rc) {
                cout << "Error:unable to create thread," << rc << endl;
                exit(-1);
            }
        }
        //usleep(1000);
        sleep(1);
    }
    pthread_exit(NULL);
    //delete client;
    return 0;
}

My Question:

Do I need to tweak zMQ sockets so that the PUB doesnt wait for a reply what am I doing wrong?

Edit:

Adding client definition:

static zmq::socket_t * s_client_socket(zmq::context_t & context) {
    std::cout << "I: connecting to server." << std::endl;
    zmq::socket_t * client = new zmq::socket_t(context, ZMQ_SUB);
    client->connect("tcp://localhost:5555");

    // Configure socket to not wait at close time
    int linger = 0;
    client->setsockopt(ZMQ_LINGER, &linger, sizeof (linger));
    return client;
}

zmq::context_t context(1);
zmq::socket_t * client = s_client_socket(context);

Upvotes: 0

Views: 600

Answers (1)

rj76
rj76

Reputation: 61

but does a zmq_PUB wait for a response?

No, this could be the case if your socket wasn't a PUB socket and you hit the high-water mark, but this isn't the case. Do the messages get sent?

Upvotes: 0

Related Questions