Reputation: 340
I try to use ZeroMQ for fast message passing. Messages need to be delivered in less than 1 [ms]
. I did some testing (inproc
, single process on Linux, no TCP) and see that usually there is no problem with that. The latency is about 10 - 100 [us]
, depending on how often the messages are sent (why?). Sometimes however messages are received after 6 [ms]
which is unacceptable.
What can be the cause that some messages are delayed?
Maybe the process is preempted?
Or it's because of polling used (zmq_poll()
)?
Example results from my test :
avg lag = 28 [us]
max lag = 5221 [us]
std dev = 25.85 [us]
big lag = 180 x above 200 [us]
"big lag" means number of cases where latency was over 200 [us]
. In my tests there are 500 000 messages sent so the value 180 means that latency over 200 [us]
was recorded in 180 / 500000 = 0,036%
. It's a quite low number but I'd like it to be zero. Even on the expense of average latency.
The test source code is below :
#include <stdlib.h>
#include <math.h>
#include <zmq.h>
#include <pthread.h>
#define SOCKETS_NUM 5
#define RUNS 100000
void *context;
int numbers[SOCKETS_NUM];
struct {
struct timespec send_time;
struct timespec receive_time;
} times[SOCKETS_NUM * RUNS], *ptimes;
static void * worker_thread(void * dummy) {
int * number = dummy;
char endpoint[] = "inproc://endpointX";
endpoint[17] = (char)('0' + *number);
void * socket = zmq_socket(context, ZMQ_PUSH);
zmq_connect(socket, endpoint);
struct timespec sleeptime, remtime;
int rnd = rand() / 3000;
sleeptime.tv_sec = 0;
sleeptime.tv_nsec = rnd;
nanosleep(&sleeptime, &remtime);
clock_gettime(CLOCK_REALTIME, &(ptimes[*number].send_time));
zmq_send(socket, "Hello", 5, 0);
zmq_close(socket);
return NULL;
}
static void run_test(zmq_pollitem_t items[]) {
pthread_t threads[SOCKETS_NUM];
for (int i = 0; i < SOCKETS_NUM; i++) {
pthread_create(&threads[i], NULL, worker_thread, &numbers[i]);
}
char buffer[10];
int to_receive = SOCKETS_NUM;
for (int i = 0; i < SOCKETS_NUM; i++) {
int rc = zmq_poll(items, SOCKETS_NUM, -1);
for (int j = 0; j < SOCKETS_NUM; j++) {
if (items[j].revents & ZMQ_POLLIN) {
clock_gettime(CLOCK_REALTIME, &(ptimes[j].receive_time));
zmq_recv(items[j].socket, buffer, 10, 0);
}
}
to_receive -= rc;
if (to_receive == 0) break;
}
for (int i = 0; i < SOCKETS_NUM; i++) {
pthread_join(threads[i], NULL);
}
}
int main(void)
{
context = zmq_ctx_new();
zmq_ctx_set(context, ZMQ_THREAD_SCHED_POLICY, SCHED_FIFO);
zmq_ctx_set(context, ZMQ_THREAD_PRIORITY, 99);
void * responders[SOCKETS_NUM];
char endpoint[] = "inproc://endpointX";
for (int i = 0; i < SOCKETS_NUM; i++) {
responders[i] = zmq_socket(context, ZMQ_PULL);
endpoint[17] = (char)('0' + i);
zmq_bind(responders[i], endpoint);
numbers[i] = i;
}
time_t tt;
time_t t = time(&tt);
srand((unsigned int)t);
zmq_pollitem_t poll_items[SOCKETS_NUM];
for (int i = 0; i < SOCKETS_NUM; i++) {
poll_items[i].socket = responders[i];
poll_items[i].events = ZMQ_POLLIN;
}
ptimes = times;
for (int i = 0; i < RUNS; i++) {
run_test(poll_items);
ptimes += SOCKETS_NUM;
}
long int lags[SOCKETS_NUM * RUNS];
long int total_lag = 0;
long int max_lag = 0;
long int big_lag = 0;
for (int i = 0; i < SOCKETS_NUM * RUNS; i++) {
lags[i] = (times[i].receive_time.tv_nsec - times[i].send_time.tv_nsec + (times[i].receive_time.tv_sec - times[i].send_time.tv_sec) * 1000000000) / 1000;
if (lags[i] > max_lag) max_lag = lags[i];
total_lag += lags[i];
if (lags[i] > 200) big_lag++;
}
long int avg_lag = total_lag / SOCKETS_NUM / RUNS;
double SD = 0.0;
for (int i = 0; i < SOCKETS_NUM * RUNS; ++i) {
SD += pow((double)(lags[i] - avg_lag), 2);
}
double std_lag = sqrt(SD / SOCKETS_NUM / RUNS);
printf("avg lag = %l5d [us]\n", avg_lag);
printf("max lag = %l5d [us]\n", max_lag);
printf("std dev = %8.2f [us]\n", std_lag);
printf("big lag = %l5d x above 200 [us]\n", big_lag);
for (int i = 0; i < SOCKETS_NUM; i++) {
zmq_close(responders[i]);
}
zmq_ctx_destroy(context);
return 0;
}
Upvotes: 3
Views: 3002
Reputation: 1
Q : "...I'd like it to be zero."
Cool to say, yet hard to make.
As you run an ultra-fast, memory-mapped inproc://
Transport Class, the main focus will be performance tweaking of the Context()
-processing. Here, you spend so awfully much setup-overhead & straight termination-overhead operations to send 1E5
-times just a 5 [B]
, so I guess there will never be a queue-management related issue, as there will never be any "stack-growing" at all.
1 ) ( suppose we let the code as-is ) it would be a natural step for the performance tuning to at least set the ZeroMQ mapping of a socket-CPU_core ZMQ_AFFINITY ( not jumping or wandering from core to core ). It may be interesting to see, if that many ~ 5E5
socket setups/terminations on the PUSH
-er side, each without ever sending more than a single shot of 5 [B]
over the memory-mapped line, could get some help (for those large overheads & maintenance) from configuring the context
-instance with SOCKETS_NUM
I/O-threads, using the ZMQ_IO_THREADS
setting ( fighting for a "RealTime"-ness, using the SCHED_FIFO
, having only one I/O-thread does not help much, does it? )
2 ) next level of experimentation is to re-balance the ZMQ_THREAD_AFFINITY_CPU_ADD
maps (the global context
's I/O-threads onto CPU-cores) and the per-socket setup of the ZMQ_AFFINITY
maps onto the context
's I/O-thread(s). Having sufficient amount of CPU-cores, there may be some performance / ultra-low latency benefits from making several gangs-of-I/O-threads serving one socket
-instance stay "together", on a same CPU-core, yet here we get into territory, where the actual hardware and the real-system's background workloads & still-"spare"-resources for this "RealTime"-ambition motivated experimenting start to become hard to predict without any in-vivo testing & validation.
3 ) tweaking per-socket zmq_setsockopt()
parameters may help, yet unless a nano-scaled socket-lifetime ( rather an expensive one-time used "consumable-disposable" ), do not expect any breakthrough from here.
4 ) trying to measure with a nanosecond resolution, the more if used for "durations" of something, ought be used by CLOCK_MONOTONIC_RAW
, that avoids ntp
-injected adjustments, astronomy-correcting leap seconds injections et al.
5 ) the zmq_poll()
-strategy: I would no go this way. Using the timeout == -1
is blocking the whole circus. A thing I strongly discourage in any distributed-computing system, the more in one, that has a "RealTime" ambition. Spinning the PULL
-side to a max performance may go via having a 1:1 PUSH/PULL
threads on either side, or if trying to challenge the grooming, have 5-PUSH
-er threads, as you have it, and collect all ingress messages on a just single, Zero-Copy well oiled PULL
-er ( easier polling, may use a payload-based index-helper to which send-side timestamp to put the receive-side timestamp ), anyway, the blocking poller is almost the anti-pattern for challenging any low-latency soft-realtime toys.
Anyway, do not hesistate to refactor the code and to use profiling tools to better see, where you "acquire" the big_lag
-s ( my guesses are above )
#include <stdlib.h>
#include <math.h>
#include <zmq.h>
#include <pthread.h>
#define SOCKETS_NUM 5
#define RUNS 100000
void *context;
int numbers[SOCKETS_NUM];
struct {
struct timespec send_time;
struct timespec recv_time;
} times[SOCKETS_NUM * RUNS],
*ptimes;
static void *worker_thread( void *dummy ) { //-------------------------- an ovehead expensive one-shot PUSH-based "Hello"-sender & .close()
int *number = dummy;
char endpoint[] = "inproc://endpointX";
endpoint[17] = (char)( '0' + *number );
int rnd = rand() / 3000;
void *socket = zmq_socket( context, ZMQ_PUSH );
struct timespec remtime,
sleeptime;
sleeptime.tv_sec = 0;
sleeptime.tv_nsec = rnd;
zmq_connect( socket, endpoint );
nanosleep( &sleeptime, &remtime ); // anything betweed < 0 : RAND_MAX/3000 > [ns] ... easily >> 32, as #define RAND_MAX 2147483647 ~ 715 827 [ns]
clock_gettime( CLOCK_REALTIME, &( ptimes[*number].send_time) ); //............................................................................ CLK_set_NEAR_SEND
// any CLOCK re-adjustments may and will skew any non-MONOTONIC_CLOCK
zmq_send( socket, "Hello", 5, 0 );
zmq_close( socket );
return NULL;
}
static void run_test( zmq_pollitem_t items[] ) { //--------------------- zmq_poll()-blocked zmq_recv()-orchestrator ( called ~ 1E5 x !!! resources' nano-use & setup + termination overheads matter )
char buffer[10];
int to_receive = SOCKETS_NUM;
pthread_t threads[SOCKETS_NUM];
for ( int i = 0; i < SOCKETS_NUM; i++ ) { //------------------------ thread-maker ( a per-socket PUSH-er[]-s )
pthread_create( &threads[i], NULL, worker_thread, &numbers[i] );
}
for ( int i = 0; i < SOCKETS_NUM; i++ ) { //------------------------ [SERIAL]-------- [i]-stepping
int rc = zmq_poll( items, SOCKETS_NUM, -1 ); //----------------- INFINITE ??? --- blocks /\/\/\/\/\/\/\/\/\/\/\ --- several may flag ZMQ_POLLIN
for ( int j = 0; j < SOCKETS_NUM; j++ ) { //-------------------- ALL-CHECKED in a loop for an items[j].revents
if ( items[j].revents & ZMQ_POLLIN ) { //------------------- FIND IF IT WAS THIS ONE
clock_gettime( CLOCK_REALTIME, &( ptimes[j].recv_time ) );//...................................................................... CLK_set_NEAR_poll()_POSACK'd R2recv
zmq_recv( items[j].socket, buffer, 10, 0 ); //---------- READ-IN from any POSACK'd by zmq_poll()-er flag(s)
}
}
to_receive -= rc; // ---------------------------------------------------------------------------------------------- SUB rc
if (to_receive == 0) break;
}
for ( int i = 0; i < SOCKETS_NUM; i++ ) { //------------------------ thread-killer
pthread_join( threads[i], NULL );
}
}
int main( void ) {
context = zmq_ctx_new();
zmq_ctx_set( context, ZMQ_THREAD_SCHED_POLICY, SCHED_FIFO );
zmq_ctx_set( context, ZMQ_THREAD_PRIORITY, 99 );
void *responders[SOCKETS_NUM];
char endpoint[] = "inproc://endpointX";
for ( int i = 0; i < SOCKETS_NUM; i++ ) {
responders[i] = zmq_socket( context, ZMQ_PULL ); // ------------ PULL instances into []
endpoint[17] = (char)( '0' + i );
zmq_bind( responders[i], endpoint ); //------------------------- .bind()
numbers[i] = i;
}
time_t tt;
time_t t = time(&tt);
srand( (unsigned int)t );
zmq_pollitem_t poll_items[SOCKETS_NUM];
for ( int i = 0; i < SOCKETS_NUM; i++ ) { //------------------------ zmq_politem_t array[] ---pre-fill---
poll_items[i].socket = responders[i];
poll_items[i].events = ZMQ_POLLIN;
}
ptimes = times;
for ( int i = 0; i < RUNS; i++ ) { //------------------------------- 1E5 RUNs
run_test( poll_items ); // ------------------------------------- RUN TEST
ptimes += SOCKETS_NUM;
}
long int lags[SOCKETS_NUM * RUNS];
long int total_lag = 0;
long int max_lag = 0;
long int big_lag = 0;
for ( int i = 0; i < SOCKETS_NUM * RUNS; i++ ) {
lags[i] = ( times[i].recv_time.tv_nsec
- times[i].send_time.tv_nsec
+ ( times[i].recv_time.tv_sec
- times[i].send_time.tv_sec
) * 1000000000
) / 1000; // --------------------------------------- [us]
if ( lags[i] > max_lag ) max_lag = lags[i];
total_lag += lags[i];
if ( lags[i] > 200 ) big_lag++;
}
long int avg_lag = total_lag / SOCKETS_NUM / RUNS;
double SD = 0.0;
for ( int i = 0; i < SOCKETS_NUM * RUNS; ++i ) {
SD += pow( (double)( lags[i] - avg_lag ), 2 );
}
double std_lag = sqrt( SD / SOCKETS_NUM / RUNS );
printf("avg lag = %l5d [us]\n", avg_lag);
printf("max lag = %l5d [us]\n", max_lag);
printf("std dev = %8.2f [us]\n", std_lag);
printf("big lag = %l5d x above 200 [us]\n", big_lag);
for ( int i = 0; i < SOCKETS_NUM; i++ ) {
zmq_close( responders[i] );
}
zmq_ctx_destroy( context );
return 0;
}
Using nanosleep
for a random (not cardinal, safely outside of any control-loop(s) activity) sleep is rather a risky luxury, as in earlier kernels caused problems:
In order to support applications requiring much more precise pauses (e.g., in order to control some time-critical hardware),
nanosleep()
would handle pauses of up to 2 ms by busy waiting with microsecond precision when called from a thread scheduled under a real-time policy likeSCHED_FIFO
orSCHED_RR
. This special extension was removed in kernel 2.5.39, hence is still present in current 2.4 kernels, but not in 2.6 kernels.
Upvotes: 2