SCO
SCO

Reputation: 1932

How to achieve more than 4000 msg/sec with ZeroMQ?

Based on a previous question on SO suggesting I use DEALER/ROUTER model to maximize performance (instead of REQ/REP model), I setup the following client and server code.

The client asynccli.c source fires 8 threads, each sending and receiving on zmq TCP sockets. The server asyncsrv.c fires 4 workers threads and uses a proxy to distribute the incoming requests to the worker threads.

For a test lasting 10 seconds, I experience performances ranging from 40 000 msgs, to 120,000, which is at best 12,000 msgs/sec which is quite low. I'm runnning Ubuntu on an i7 (8HT cores) laptop having 8GB memory. I use czmq library.

I thought I could achieve > 200,000 msgs/s with ZeroMQ. I guess I didn't catch the "async" thing correctly. Any C sample code around ? Basically I don't see how to get the async thing since I'm currently zmq_poll()ing here.

asynccli.c :

// results : 4000/s
#include "czmq.h"
int id = 0;

static void *
client_task (void *args)
{
    zctx_t *ctx = zctx_new ();
    void *client = zsocket_new (ctx, ZMQ_DEALER);

    char identity [10];

    sprintf (identity, "%d", id);
    zsockopt_set_identity (client, identity);
    zsocket_connect (client, "tcp://localhost:5570");

    zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
    int request_nbr = 0;
    while (true) {
        //  Tick once per second, pulling in arriving messages
        int centitick;
        for (centitick = 0; centitick < 100; centitick++) {
            zmq_poll (items, 1, 1);
            if (items [0].revents & ZMQ_POLLIN) {
                zmsg_t *msg = zmsg_recv (client);
                //zframe_print (zmsg_last (msg), identity);
                zmsg_destroy (&msg);
                break;
            }
        }

        id+=1;
        zstr_send (client, "request #%d", ++request_nbr);
    }
    zctx_destroy (&ctx);
    return NULL;
}

//  The main thread simply starts several clients and a server, and then
//  waits for the server to finish.

int main (void)
{

    zthread_new (client_task, NULL);
    zthread_new (client_task, NULL);
    zthread_new (client_task, NULL);
    zthread_new (client_task, NULL);
    zthread_new (client_task, NULL);
    zthread_new (client_task, NULL);
    zthread_new (client_task, NULL);
    zthread_new (client_task, NULL);

    zclock_sleep (10 * 1000);    //  Run for 10 seconds then quit
    printf ("\\ntotal iterations = %d\n" , id );
    return 0;
}

asyncsrv.c:

#include "czmq.h"

static void server_worker (void *args, zctx_t *ctx, void *pipe);

void *server_task (void *args)
{
    //  Frontend socket talks to clients over TCP
    zctx_t *ctx = zctx_new ();
    void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
    zsocket_bind (frontend, "tcp://*:5570");

    //  Backend socket talks to workers over inproc
    void *backend = zsocket_new (ctx, ZMQ_DEALER);
    zsocket_bind (backend, "inproc://backend");

    //  Launch pool of worker threads, precise number is not critical
    int thread_nbr;
    for (thread_nbr = 0; thread_nbr < 3; thread_nbr++)
        zthread_fork (ctx, server_worker, NULL);

    //  Connect backend to frontend via a proxy
    zmq_proxy (frontend, backend, NULL);

    zctx_destroy (&ctx);
    return NULL;
}

static void
server_worker (void *args, zctx_t *ctx, void *pipe)
{
    void *worker = zsocket_new (ctx, ZMQ_DEALER);
    zsocket_connect (worker, "inproc://backend");

    while (true) {
        //  The DEALER socket gives us the reply envelope and message
        zmsg_t *msg = zmsg_recv (worker);
        zframe_t *identity = zmsg_pop (msg);
        zframe_t *content = zmsg_pop (msg);
        assert (content);
        zmsg_destroy (&msg);

        //  Sleep for some fraction of a second
        zframe_send (&identity, worker, ZFRAME_REUSE + ZFRAME_MORE);
        zframe_send (&content, worker, ZFRAME_REUSE);

        zframe_destroy (&identity);
        zframe_destroy (&content);
    }
}

int main (void)
{
    zthread_new (server_task, NULL);
    zclock_sleep (15 * 1000);    //  Run for 15 seconds then quit
    return 0;
}

Upvotes: 1

Views: 1384

Answers (2)

Gopalakrishna Palem
Gopalakrishna Palem

Reputation: 1715

The problem is: client is "limited" in its sending capability by having the 'read' before 'send' in your code.

Right now, the code in the client is:

while(true)
{
  pull_any_income_messages()
  send()
}

This will severely limit the client from sending anything in case of any pending incoming messages. So, this essentially becomes a request-reply pattern.

To scale this, you would have to decouple the 'pull incoming messages' and 'send' parts. One way could be, instead of having universal client thread that handles both sending and receiving, create two separate thread types for client one that exclusively sends and another exclusively reads.

Another approach would be to do 'credit based flow control'. Chapter 7 in the ZMQ Guide has info on it (http://zguide.zeromq.org/page:all#Chapter-Advanced-Architecture-using-MQ).

-GK

http://gk.palem.in/

Upvotes: 3

jump3r
jump3r

Reputation: 161

You have to use the same logic in worker as with dealer:

while(1)

zmq_pollitem_t items [] = { { worker, 0, ZMQ_POLLIN, 0 } };

zmq_poll (items, 1, 1);

if (items [0].revents & ZMQ_POLLIN) 

Without if statement, the zmsg_recv() blocks. Also, zmq_pollitem_t should be recreated within while with each iteration. I think there is low level reason for that, take a look at original sockets with the use of FD_SET and select, it may give you a clue... .

Also, this is incorrect as well:

for (centitick = 0; centitick < 100; centitick++) {

Use some counter instead if you want measure 100 iterrations only.

Upvotes: 3

Related Questions