grzegorz
grzegorz

Reputation: 340

What causes cases with high ZeroMQ latency and how to avoid them?

I try to use ZeroMQ for fast message passing. Messages need to be delivered in less than 1 [ms]. I did some testing (inproc, single process on Linux, no TCP) and see that usually there is no problem with that. The latency is about 10 - 100 [us], depending on how often the messages are sent (why?). Sometimes however messages are received after 6 [ms] which is unacceptable.

What can be the cause that some messages are delayed?

Maybe the process is preempted?

Or it's because of polling used (zmq_poll())?

Example results from my test :

avg lag =    28    [us]
max lag =  5221    [us]
std dev =    25.85 [us]
big lag =   180    x above 200 [us]

"big lag" means number of cases where latency was over 200 [us]. In my tests there are 500 000 messages sent so the value 180 means that latency over 200 [us] was recorded in 180 / 500000 = 0,036%. It's a quite low number but I'd like it to be zero. Even on the expense of average latency.

The test source code is below :

#include <stdlib.h>
#include <math.h>
#include <zmq.h>
#include <pthread.h>

#define SOCKETS_NUM 5
#define RUNS 100000

void *context;
int numbers[SOCKETS_NUM];
struct {
    struct timespec send_time;
    struct timespec receive_time;
} times[SOCKETS_NUM * RUNS], *ptimes;

static void * worker_thread(void * dummy) {
    int * number = dummy;
    char endpoint[] = "inproc://endpointX";
    endpoint[17] = (char)('0' + *number);
    void * socket = zmq_socket(context, ZMQ_PUSH);
    zmq_connect(socket, endpoint);
    struct timespec sleeptime, remtime;
    int rnd = rand() / 3000;
    sleeptime.tv_sec = 0;
    sleeptime.tv_nsec = rnd;
    nanosleep(&sleeptime, &remtime);
    clock_gettime(CLOCK_REALTIME, &(ptimes[*number].send_time));
    zmq_send(socket, "Hello", 5, 0);
    zmq_close(socket);
    return NULL;
}

static void run_test(zmq_pollitem_t items[]) {
    pthread_t threads[SOCKETS_NUM];
    for (int i = 0; i < SOCKETS_NUM; i++) {
        pthread_create(&threads[i], NULL, worker_thread, &numbers[i]);
    }

    char buffer[10];
    int to_receive = SOCKETS_NUM;
    for (int i = 0; i < SOCKETS_NUM; i++) {
        int rc = zmq_poll(items, SOCKETS_NUM, -1);
        for (int j = 0; j < SOCKETS_NUM; j++) {
            if (items[j].revents & ZMQ_POLLIN) {
                clock_gettime(CLOCK_REALTIME, &(ptimes[j].receive_time));
                zmq_recv(items[j].socket, buffer, 10, 0);
            }
        }
        to_receive -= rc;
        if (to_receive == 0) break;
    }

    for (int i = 0; i < SOCKETS_NUM; i++) {
        pthread_join(threads[i], NULL);
    }
}

int main(void)
{
    context = zmq_ctx_new();
    zmq_ctx_set(context, ZMQ_THREAD_SCHED_POLICY, SCHED_FIFO);
    zmq_ctx_set(context, ZMQ_THREAD_PRIORITY, 99);
    void * responders[SOCKETS_NUM];
    char endpoint[] = "inproc://endpointX";
    for (int i = 0; i < SOCKETS_NUM; i++) {
        responders[i] = zmq_socket(context, ZMQ_PULL);
        endpoint[17] = (char)('0' + i);
        zmq_bind(responders[i], endpoint);
        numbers[i] = i;
    }

    time_t tt;
    time_t t = time(&tt);
    srand((unsigned int)t);

    zmq_pollitem_t poll_items[SOCKETS_NUM];
    for (int i = 0; i < SOCKETS_NUM; i++) {
        poll_items[i].socket = responders[i];
        poll_items[i].events = ZMQ_POLLIN;
    }

    ptimes = times;
    for (int i = 0; i < RUNS; i++) {
        run_test(poll_items);
        ptimes += SOCKETS_NUM;
    }

    long int lags[SOCKETS_NUM * RUNS];
    long int total_lag = 0;
    long int max_lag = 0;
    long int big_lag = 0;
    for (int i = 0; i < SOCKETS_NUM * RUNS; i++) {
        lags[i] = (times[i].receive_time.tv_nsec - times[i].send_time.tv_nsec + (times[i].receive_time.tv_sec - times[i].send_time.tv_sec) * 1000000000) / 1000;
        if (lags[i] > max_lag) max_lag = lags[i];
        total_lag += lags[i];
        if (lags[i] > 200) big_lag++;
    }
    long int avg_lag = total_lag / SOCKETS_NUM / RUNS;
    double SD = 0.0;
    for (int i = 0; i < SOCKETS_NUM * RUNS; ++i) {
        SD += pow((double)(lags[i] - avg_lag), 2);
    }
    double std_lag = sqrt(SD / SOCKETS_NUM / RUNS);
    printf("avg lag = %l5d    [us]\n", avg_lag);
    printf("max lag = %l5d    [us]\n", max_lag);
    printf("std dev = %8.2f [us]\n", std_lag);
    printf("big lag = %l5d    x above 200 [us]\n", big_lag);

    for (int i = 0; i < SOCKETS_NUM; i++) {
        zmq_close(responders[i]);
    }
    zmq_ctx_destroy(context);
    return 0;
}

Upvotes: 3

Views: 3002

Answers (1)

user3666197
user3666197

Reputation: 1

Q : "...I'd like it to be zero."

Cool to say, yet hard to make.

As you run an ultra-fast, memory-mapped inproc:// Transport Class, the main focus will be performance tweaking of the Context()-processing. Here, you spend so awfully much setup-overhead & straight termination-overhead operations to send 1E5-times just a 5 [B], so I guess there will never be a queue-management related issue, as there will never be any "stack-growing" at all.

1 ) ( suppose we let the code as-is ) it would be a natural step for the performance tuning to at least set the ZeroMQ mapping of a socket-CPU_core ZMQ_AFFINITY ( not jumping or wandering from core to core ). It may be interesting to see, if that many ~ 5E5 socket setups/terminations on the PUSH-er side, each without ever sending more than a single shot of 5 [B] over the memory-mapped line, could get some help (for those large overheads & maintenance) from configuring the context-instance with SOCKETS_NUM I/O-threads, using the ZMQ_IO_THREADS setting ( fighting for a "RealTime"-ness, using the SCHED_FIFO, having only one I/O-thread does not help much, does it? )

2 ) next level of experimentation is to re-balance the ZMQ_THREAD_AFFINITY_CPU_ADD maps (the global context's I/O-threads onto CPU-cores) and the per-socket setup of the ZMQ_AFFINITY maps onto the context's I/O-thread(s). Having sufficient amount of CPU-cores, there may be some performance / ultra-low latency benefits from making several gangs-of-I/O-threads serving one socket-instance stay "together", on a same CPU-core, yet here we get into territory, where the actual hardware and the real-system's background workloads & still-"spare"-resources for this "RealTime"-ambition motivated experimenting start to become hard to predict without any in-vivo testing & validation.

3 ) tweaking per-socket zmq_setsockopt() parameters may help, yet unless a nano-scaled socket-lifetime ( rather an expensive one-time used "consumable-disposable" ), do not expect any breakthrough from here.

4 ) trying to measure with a nanosecond resolution, the more if used for "durations" of something, ought be used by CLOCK_MONOTONIC_RAW, that avoids ntp-injected adjustments, astronomy-correcting leap seconds injections et al.

5 ) the zmq_poll()-strategy: I would no go this way. Using the timeout == -1 is blocking the whole circus. A thing I strongly discourage in any distributed-computing system, the more in one, that has a "RealTime" ambition. Spinning the PULL-side to a max performance may go via having a 1:1 PUSH/PULL threads on either side, or if trying to challenge the grooming, have 5-PUSH-er threads, as you have it, and collect all ingress messages on a just single, Zero-Copy well oiled PULL-er ( easier polling, may use a payload-based index-helper to which send-side timestamp to put the receive-side timestamp ), anyway, the blocking poller is almost the anti-pattern for challenging any low-latency soft-realtime toys.

Anyway, do not hesistate to refactor the code and to use profiling tools to better see, where you "acquire" the big_lag-s ( my guesses are above )

#include <stdlib.h>
#include <math.h>
#include <zmq.h>
#include <pthread.h>

#define SOCKETS_NUM      5
#define        RUNS 100000

void *context;
int   numbers[SOCKETS_NUM];
struct {
    struct timespec send_time;
    struct timespec recv_time;
} times[SOCKETS_NUM * RUNS],
 *ptimes;

static void *worker_thread( void *dummy ) { //-------------------------- an ovehead expensive one-shot PUSH-based "Hello"-sender & .close()
    
    int   *number       = dummy;
    char   endpoint[]   = "inproc://endpointX";
           endpoint[17] = (char)( '0' + *number );
    int    rnd          = rand() / 3000;
    void  *socket       = zmq_socket( context, ZMQ_PUSH );
            
    struct timespec   remtime,
                    sleeptime;
                    sleeptime.tv_sec  = 0;
                    sleeptime.tv_nsec = rnd;
                    
    zmq_connect( socket, endpoint );
    
    nanosleep( &sleeptime, &remtime ); // anything betweed < 0 : RAND_MAX/3000 > [ns] ... easily >> 32, as #define RAND_MAX    2147483647 ~ 715 827 [ns]
    
    clock_gettime( CLOCK_REALTIME, &( ptimes[*number].send_time) ); //............................................................................ CLK_set_NEAR_SEND
                                                                    // any CLOCK re-adjustments may and will skew any non-MONOTONIC_CLOCK
    
    zmq_send(  socket, "Hello", 5, 0 );
    zmq_close( socket );
    
    return NULL;
}

static void run_test( zmq_pollitem_t items[] ) { //--------------------- zmq_poll()-blocked zmq_recv()-orchestrator ( called ~ 1E5 x !!! resources' nano-use & setup + termination overheads matter )
    
    char      buffer[10];
    int       to_receive = SOCKETS_NUM;
    pthread_t threads[SOCKETS_NUM];
    
    for ( int i = 0; i < SOCKETS_NUM; i++ ) { //------------------------ thread-maker ( a per-socket PUSH-er[]-s )
        pthread_create( &threads[i], NULL, worker_thread, &numbers[i] );
    }
    
    for ( int i = 0; i < SOCKETS_NUM; i++ ) { //------------------------ [SERIAL]-------- [i]-stepping
        
        int rc = zmq_poll( items, SOCKETS_NUM, -1 ); //----------------- INFINITE ??? --- blocks /\/\/\/\/\/\/\/\/\/\/\ --- several may flag ZMQ_POLLIN
        
        for ( int j = 0; j < SOCKETS_NUM; j++ ) { //-------------------- ALL-CHECKED in a loop for an items[j].revents
            
            if ( items[j].revents & ZMQ_POLLIN ) { //------------------- FIND IF IT WAS THIS ONE
                
                clock_gettime( CLOCK_REALTIME, &( ptimes[j].recv_time ) );//...................................................................... CLK_set_NEAR_poll()_POSACK'd R2recv
                
                zmq_recv( items[j].socket, buffer, 10, 0 ); //---------- READ-IN from any POSACK'd by zmq_poll()-er flag(s)
            }
        }
        to_receive -= rc; // ---------------------------------------------------------------------------------------------- SUB rc
        if (to_receive == 0) break;
    }

    for ( int i = 0; i < SOCKETS_NUM; i++ ) { //------------------------ thread-killer
        
        pthread_join( threads[i], NULL );
    }
}

int main( void ) {
    
                 context = zmq_ctx_new();
    zmq_ctx_set( context, ZMQ_THREAD_SCHED_POLICY, SCHED_FIFO );
    zmq_ctx_set( context, ZMQ_THREAD_PRIORITY, 99 );
    
    void *responders[SOCKETS_NUM];
    char  endpoint[] = "inproc://endpointX";
    
    for ( int i = 0; i < SOCKETS_NUM; i++ ) {
        
        responders[i] = zmq_socket( context, ZMQ_PULL ); // ------------ PULL instances into []
        endpoint[17] = (char)( '0' + i );
        zmq_bind( responders[i], endpoint ); //------------------------- .bind()
        numbers[i] = i;
    }

    time_t tt;
    time_t t = time(&tt);
    srand( (unsigned int)t );

    zmq_pollitem_t poll_items[SOCKETS_NUM];
    
    for ( int i = 0; i < SOCKETS_NUM; i++ ) { //------------------------ zmq_politem_t array[] ---pre-fill---
        poll_items[i].socket = responders[i];
        poll_items[i].events = ZMQ_POLLIN;
    }

    ptimes = times;
    
    for ( int i = 0; i < RUNS; i++ ) { //------------------------------- 1E5 RUNs
        run_test( poll_items ); // -------------------------------------     RUN TEST
        ptimes += SOCKETS_NUM;
    }

    long int lags[SOCKETS_NUM * RUNS];
    long int total_lag = 0;
    long int   max_lag = 0;
    long int   big_lag = 0;
    
    for ( int i = 0; i < SOCKETS_NUM * RUNS; i++ ) {
        lags[i] = (   times[i].recv_time.tv_nsec
                  -   times[i].send_time.tv_nsec
                  + ( times[i].recv_time.tv_sec
                    - times[i].send_time.tv_sec
                      ) * 1000000000
                    ) / 1000; // --------------------------------------- [us]
        if ( lags[i] > max_lag ) max_lag = lags[i];
        total_lag += lags[i];
        if ( lags[i] > 200 )     big_lag++;
    }
    
    long int avg_lag = total_lag / SOCKETS_NUM / RUNS;
    double        SD = 0.0;
    
    for ( int i = 0; i < SOCKETS_NUM * RUNS; ++i ) {
        SD += pow( (double)( lags[i] - avg_lag ), 2 );
    }
    
    double std_lag = sqrt( SD / SOCKETS_NUM / RUNS );
    
    printf("avg lag = %l5d    [us]\n", avg_lag);
    printf("max lag = %l5d    [us]\n", max_lag);
    printf("std dev = %8.2f [us]\n", std_lag);
    printf("big lag = %l5d    x above 200 [us]\n", big_lag);

    for ( int i = 0; i < SOCKETS_NUM; i++ ) {
        zmq_close( responders[i] );
    }
    zmq_ctx_destroy( context );
    
    return 0;
}

Using nanosleep for a random (not cardinal, safely outside of any control-loop(s) activity) sleep is rather a risky luxury, as in earlier kernels caused problems:

In order to support applications requiring much more precise pauses (e.g., in order to control some time-critical hardware), nanosleep() would handle pauses of up to 2 ms by busy waiting with microsecond precision when called from a thread scheduled under a real-time policy like SCHED_FIFO or SCHED_RR. This special extension was removed in kernel 2.5.39, hence is still present in current 2.4 kernels, but not in 2.6 kernels.

Upvotes: 2

Related Questions