user505160
user505160

Reputation: 1216

Qt with ZeroMQ publish subscribe pattern

I would like to use ZeroMQ(4.1.2) with Qt (5.2.1). Idea is to have zmq pub/sub (where server is outside) and sub is qt app. Currently receive in Qt app runs once, could someone drop hint? Should ZeroMQ receiver be implemented in some other way?

Currently my code looks like:

mainwindow.h

namespace Ui {
class MainWindow;
}

class MainWindow : public QMainWindow
{
Q_OBJECT

public:
    explicit MainWindow(QWidget *parent = 0);
    ~MainWindow();

private slots:
    void on_pushButton_clicked();

    void readZMQData();

private:
    Ui::MainWindow *ui;
    QSocketNotifier *qsn;
    void *context;
    void *subscriber;

};

mainwindow.cpp

MainWindow::MainWindow(QWidget *parent) :
    QMainWindow(parent),
    ui(new Ui::MainWindow)
{
    ui->setupUi(this);

    /***** ZMQ *****/

    context = zmq_ctx_new ();
    subscriber = zmq_socket (context, ZMQ_SUB);
    int rc = zmq_connect (subscriber, "tcp://localhost:5556");

    char *filter = "";
    rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter, strlen (filter));
    unsigned int fd=0;
    size_t fd_size = sizeof(fd);
    rc = zmq_getsockopt(subscriber,ZMQ_FD,&fd,&fd_size);

    qsn = new QSocketNotifier(fd, QSocketNotifier::Read, this);
    connect(qsn, SIGNAL(activated(int)), this, SLOT(readZMQData()), Qt::DirectConnection);

}

MainWindow::~MainWindow()
{
    zmq_close (this->subscriber);
    zmq_ctx_destroy (this->context);
    delete ui;
}


void MainWindow::readZMQData()
{
    qsn->setEnabled(false);
    qDebug() << "Got data!";

    int events = 0;
    std::size_t eventsSize = sizeof(events);
    zmq_getsockopt(subscriber,ZMQ_EVENTS, &events, &eventsSize);
    if(events & ZMQ_POLLIN){
        qDebug() << " ======  Data to read ======";

        char *string = s_recv(subscriber);
        qDebug() << "DATA: " << string;
        free(string);
    }

    qsn->setEnabled(true);
}

And server app is (from ZeroMQ examples):

#include "zhelpers.h"

int main (void)
{
    //  Prepare our context and publisher
    void *context = zmq_ctx_new ();
    void *publisher = zmq_socket (context, ZMQ_PUB);
    int rc = zmq_bind (publisher, "tcp://*:5556");
    assert (rc == 0);

    //  Initialize random number generator
    srandom ((unsigned) time (NULL));
    while (1) {
        //  Get values that will fool the boss
        int zipcode, temperature, relhumidity;
        zipcode     = randof (100000);
        temperature = randof (215) - 80;
        relhumidity = randof (50) + 10;

        //  Send message to all subscribers
        char update [20];
        sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
        s_send (publisher, update);
    }
    zmq_close (publisher);
    zmq_ctx_destroy (context);
    return 0;
}

Upvotes: 4

Views: 5694

Answers (3)

sirop
sirop

Reputation: 180

As it is not clear what s_recv_nb(zmq::socket_t & socket) is, I'll provide my - slightly more detailed - implemetation of:

      void MainWindow::readZMQData()
      {
       qsn->setEnabled(false);

       int events = 0;
       std::size_t eventsSize = sizeof(events);
       zmq_getsockopt(subscriber,ZMQ_EVENTS, &events, &eventsSize);
       if(events & ZMQ_POLLIN){
       qDebug() << " ======  Data to read ======";

       char *string;
       int64_t more;
       size_t more_size = sizeof (more);

       do {
           /* Create an empty ØMQ message to hold the message part */
           zmq_msg_t part;
           int rc = zmq_msg_init (&part);
           assert (rc == 0);
           rc = zmq_msg_recv (&part, subscriber, 0);
           assert (rc != -1);
           /* Determine if more message parts are to follow */
           rc = zmq_getsockopt (subscriber, ZMQ_RCVMORE, &more, &more_size);
           assert (rc == 0);
           string = (char*) zmq_msg_data(&part);
           qDebug() << QString(string) ; // << "more" << more;

           zmq_msg_close (&part); 
          } while (more);
       }

       qsn->setEnabled(true);
     }

And one more remark: in case of Windows 64 the filedescriptor fdshould be uint64_t as in zmq_getsockopt returns EINVAL on windows x64 when local address of ZMQ_FD option_val passed

Upvotes: 0

user505160
user505160

Reputation: 1216

First tnx for helping out,

I've found the issue, when ZeroMQ notifies that there is message to read you need to read them all, not just first one.

    void MainWindow::readZMQData(int fd)
{
    qsn->setEnabled(false);

    int events = 0;
    std::size_t eventsSize = sizeof(events);
    zmq_getsockopt(subscriber,ZMQ_EVENTS, &events, &eventsSize);
    if(events & ZMQ_POLLIN){
        qDebug() << " ======  Data to read ======";

        char *string;
        // THIS IS THE TRICK! READ UNTIL THERE IS MSG
        while((string = s_recv_nb(subscriber)) != NULL){
            qDebug() << "DATA: " << string;
            free(string);
        }
    }

    qsn->setEnabled(true);
}

Upvotes: 4

phyatt
phyatt

Reputation: 19152

The socket notifier looks like it should work. Have you read the docs on handling it properly? Especially if you are on Windows, it looks like there are special ways to handle it when doing the read... disabling, reading, etc.

http://doc.qt.io/qt-5/qsocketnotifier.html#details

Hope that helps.

Upvotes: 1

Related Questions