是什么原因导致ZeroMQ延迟高的情况以及如何避免'em?

问题描述 投票:0回答:1

我尝试使用ZeroMQ进行快速消息传递。邮件需要以小于[[1 [ms]的形式传递。我做了一些测试(inproc,Linux上的单个进程,没有TCP),发现通常没有任何问题。延迟大约为10 - 100 [us],具体取决于发送消息的频率(为什么?)。但是有时6 [ms]之后会收到消息,这是不可接受的。

某些消息延迟的原因是什么?

也许该进程被抢占了?

或者是由于使用了轮询(zmq_poll())?

示例测试结果:

avg lag = 28 [us] max lag = 5221 [us] std dev = 25.85 [us] big lag = 180 x above 200 [us]

“大滞后”是指等待时间超过200 [us]的情况数。在我的测试中,发送了500 000条消息,因此值180表示200 [us]中记录了超过180 / 500000 = 0,036%的延迟。这是一个很小的数字,但我希望它为零。即使以平均延迟为代价。

测试源代码如下:

#include <stdlib.h> #include <math.h> #include <zmq.h> #include <pthread.h> #define SOCKETS_NUM 5 #define RUNS 100000 void *context; int numbers[SOCKETS_NUM]; struct { struct timespec send_time; struct timespec receive_time; } times[SOCKETS_NUM * RUNS], *ptimes; static void * worker_thread(void * dummy) { int * number = dummy; char endpoint[] = "inproc://endpointX"; endpoint[17] = (char)('0' + *number); void * socket = zmq_socket(context, ZMQ_PUSH); zmq_connect(socket, endpoint); struct timespec sleeptime, remtime; int rnd = rand() / 3000; sleeptime.tv_sec = 0; sleeptime.tv_nsec = rnd; nanosleep(&sleeptime, &remtime); clock_gettime(CLOCK_REALTIME, &(ptimes[*number].send_time)); zmq_send(socket, "Hello", 5, 0); zmq_close(socket); return NULL; } static void run_test(zmq_pollitem_t items[]) { pthread_t threads[SOCKETS_NUM]; for (int i = 0; i < SOCKETS_NUM; i++) { pthread_create(&threads[i], NULL, worker_thread, &numbers[i]); } char buffer[10]; int to_receive = SOCKETS_NUM; for (int i = 0; i < SOCKETS_NUM; i++) { int rc = zmq_poll(items, SOCKETS_NUM, -1); for (int j = 0; j < SOCKETS_NUM; j++) { if (items[j].revents & ZMQ_POLLIN) { clock_gettime(CLOCK_REALTIME, &(ptimes[j].receive_time)); zmq_recv(items[j].socket, buffer, 10, 0); } } to_receive -= rc; if (to_receive == 0) break; } for (int i = 0; i < SOCKETS_NUM; i++) { pthread_join(threads[i], NULL); } } int main(void) { context = zmq_ctx_new(); zmq_ctx_set(context, ZMQ_THREAD_SCHED_POLICY, SCHED_FIFO); zmq_ctx_set(context, ZMQ_THREAD_PRIORITY, 99); void * responders[SOCKETS_NUM]; char endpoint[] = "inproc://endpointX"; for (int i = 0; i < SOCKETS_NUM; i++) { responders[i] = zmq_socket(context, ZMQ_PULL); endpoint[17] = (char)('0' + i); zmq_bind(responders[i], endpoint); numbers[i] = i; } time_t tt; time_t t = time(&tt); srand((unsigned int)t); zmq_pollitem_t poll_items[SOCKETS_NUM]; for (int i = 0; i < SOCKETS_NUM; i++) { poll_items[i].socket = responders[i]; poll_items[i].events = ZMQ_POLLIN; } ptimes = times; for (int i = 0; i < RUNS; i++) { run_test(poll_items); ptimes += SOCKETS_NUM; } long int lags[SOCKETS_NUM * RUNS]; long int total_lag = 0; long int max_lag = 0; long int big_lag = 0; for (int i = 0; i < SOCKETS_NUM * RUNS; i++) { lags[i] = (times[i].receive_time.tv_nsec - times[i].send_time.tv_nsec + (times[i].receive_time.tv_sec - times[i].send_time.tv_sec) * 1000000000) / 1000; if (lags[i] > max_lag) max_lag = lags[i]; total_lag += lags[i]; if (lags[i] > 200) big_lag++; } long int avg_lag = total_lag / SOCKETS_NUM / RUNS; double SD = 0.0; for (int i = 0; i < SOCKETS_NUM * RUNS; ++i) { SD += pow((double)(lags[i] - avg_lag), 2); } double std_lag = sqrt(SD / SOCKETS_NUM / RUNS); printf("avg lag = %l5d [us]\n", avg_lag); printf("max lag = %l5d [us]\n", max_lag); printf("std dev = %8.2f [us]\n", std_lag); printf("big lag = %l5d x above 200 [us]\n", big_lag); for (int i = 0; i < SOCKETS_NUM; i++) { zmq_close(responders[i]); } zmq_ctx_destroy(context); return 0; }

performance real-time zeromq latency low-latency
1个回答
0
投票

Q:“ ...我希望它为零。”

很酷,但是很难制造。

首先,欢迎来到零之禅的艺术。

<<


接下来,当您运行超快速的,内存映射的ZeroMQ传输类时,主要重点是"ZeroMQ: Principles in less than Five Seconds"处理的性能调整。在这里,您花费了大量的设置开销和直接终止开销操作来发送


inproc:// -times仅

Context(),所以我想永远不会有与队列管理相关的问题,因为根本不会有任何“堆栈增长”。1](假设我们按原样保留代码),这是性能调整的自然步骤,至少要设置socket-CPU_core ZMQ_AFFINITY的ZeroMQ映射(不要在内核之间跳跃或徘徊) )。有趣的是,如果在[[1E5

-er端有这么多的5 [B]套接字设置/终止,而每个都没有在内存上发送多于一次的~ 5E5 -mapped行,可以通过使用PUSH设置(使用5 [B] I / O线程配置

context-instance来获得一些帮助(针对那些较大的开销和维护)(争取“实时” “ -ness,使用SOCKETS_NUM,仅具有一个I / O线程没有太大帮助,是吗?]2]下一个实验级别是重新平衡ZMQ_IO_THREADS映射(全局SCHED_FIFO的I / O线程到CPU内核上)和ZMQ_THREAD_AFFINITY_CPU_ADD的每个插槽设置]映射到context的I / O线程。拥有足够数量的CPU核,通过使服务一个ZMQ_AFFINITY实例的多个I / O线程在“同一” CPU上保持“在一起”,可能会带来一些性能/超低延迟优势,然而,在这里,我们进入了一个领域,在这种情况下,如果没有任何体内测试和验证,这种“实时”野心驱动的实验的实际硬件和实际系统的后台工作负载以及仍然是“备用”资源的情况将变得难以预测。

3]调整每个套接字的context参数可能会有所帮助,但是除非纳米级套接字生存期(相当昂贵的一次性“耗材”)使用,否则不要指望有任何突破。

4]试图以纳秒级的分辨率进行测量,socket应该使用更多的“持续时间”,以避免zmq_setsockopt()注入的调整,避免天文学校正的leap秒注入等

5] CLOCK_MONOTONIC_RAW策略:我不会这样。使用ntp阻碍了整个马戏团。我强烈反对在任何[[zmq_poll()

系统中具有“实时”雄心的事情。要使timeout == -1端达到最高性能,可能是通过在任一端设置1:1 线程来实现的,或者,如果您想挑战修饰,请像您一样拥有5-PULL-er线程,并将所有入口消息收集在一个单一的,零复制的上油的PUSH/PULL -er上(更容易轮询,可以使用基于有效负载的索引帮助器,将发送方时间戳记放置在该索引帮助符上),无论如何,阻止轮询程序几乎是挑战任何低延迟软实时玩具的反模式。

无论如何,不​​要犹豫,重构代码并使用性能分析工具更好地了解在哪里“获取” PUSH -s(我的猜测在上面)

PULL使用big_lag进行随机(不是基本的,安全地处于任何控制回路活动之外)睡眠是一种冒险的奢侈,因为在较早的内核中会引起问题:


为了支持需要更精确的暂停的应用程序(例如,为了控制一些时间紧迫的硬件),#include <stdlib.h> #include <math.h> #include <zmq.h> #include <pthread.h> #define SOCKETS_NUM 5 #define RUNS 100000 void *context; int numbers[SOCKETS_NUM]; struct { struct timespec send_time; struct timespec recv_time; } times[SOCKETS_NUM * RUNS], *ptimes; static void *worker_thread( void *dummy ) { //-------------------------- an ovehead expensive one-shot PUSH-based "Hello"-sender & .close() int *number = dummy; char endpoint[] = "inproc://endpointX"; endpoint[17] = (char)( '0' + *number ); int rnd = rand() / 3000; void *socket = zmq_socket( context, ZMQ_PUSH ); struct timespec remtime, sleeptime; sleeptime.tv_sec = 0; sleeptime.tv_nsec = rnd; zmq_connect( socket, endpoint ); nanosleep( &sleeptime, &remtime ); // anything betweed < 0 : RAND_MAX/3000 > [ns] ... easily >> 32, as #define RAND_MAX 2147483647 ~ 715 827 [ns] clock_gettime( CLOCK_REALTIME, &( ptimes[*number].send_time) ); //............................................................................ CLK_set_NEAR_SEND // any CLOCK re-adjustments may and will skew any non-MONOTONIC_CLOCK zmq_send( socket, "Hello", 5, 0 ); zmq_close( socket ); return NULL; } static void run_test( zmq_pollitem_t items[] ) { //--------------------- zmq_poll()-blocked zmq_recv()-orchestrator ( called ~ 1E5 x !!! resources' nano-use & setup + termination overheads matter ) char buffer[10]; int to_receive = SOCKETS_NUM; pthread_t threads[SOCKETS_NUM]; for ( int i = 0; i < SOCKETS_NUM; i++ ) { //------------------------ thread-maker ( a per-socket PUSH-er[]-s ) pthread_create( &threads[i], NULL, worker_thread, &numbers[i] ); } for ( int i = 0; i < SOCKETS_NUM; i++ ) { //------------------------ [SERIAL]-------- [i]-stepping int rc = zmq_poll( items, SOCKETS_NUM, -1 ); //----------------- INFINITE ??? --- blocks /\/\/\/\/\/\/\/\/\/\/\ --- several may flag ZMQ_POLLIN for ( int j = 0; j < SOCKETS_NUM; j++ ) { //-------------------- ALL-CHECKED in a loop for an items[j].revents if ( items[j].revents & ZMQ_POLLIN ) { //------------------- FIND IF IT WAS THIS ONE clock_gettime( CLOCK_REALTIME, &( ptimes[j].recv_time ) );//...................................................................... CLK_set_NEAR_poll()_POSACK'd R2recv zmq_recv( items[j].socket, buffer, 10, 0 ); //---------- READ-IN from any POSACK'd by zmq_poll()-er flag(s) } } to_receive -= rc; // ---------------------------------------------------------------------------------------------- SUB rc if (to_receive == 0) break; } for ( int i = 0; i < SOCKETS_NUM; i++ ) { //------------------------ thread-killer pthread_join( threads[i], NULL ); } } int main( void ) { context = zmq_ctx_new(); zmq_ctx_set( context, ZMQ_THREAD_SCHED_POLICY, SCHED_FIFO ); zmq_ctx_set( context, ZMQ_THREAD_PRIORITY, 99 ); void *responders[SOCKETS_NUM]; char endpoint[] = "inproc://endpointX"; for ( int i = 0; i < SOCKETS_NUM; i++ ) { responders[i] = zmq_socket( context, ZMQ_PULL ); // ------------ PULL instances into [] endpoint[17] = (char)( '0' + i ); zmq_bind( responders[i], endpoint ); //------------------------- .bind() numbers[i] = i; } time_t tt; time_t t = time(&tt); srand( (unsigned int)t ); zmq_pollitem_t poll_items[SOCKETS_NUM]; for ( int i = 0; i < SOCKETS_NUM; i++ ) { //------------------------ zmq_politem_t array[] ---pre-fill--- poll_items[i].socket = responders[i]; poll_items[i].events = ZMQ_POLLIN; } ptimes = times; for ( int i = 0; i < RUNS; i++ ) { //------------------------------- 1E5 RUNs run_test( poll_items ); // ------------------------------------- RUN TEST ptimes += SOCKETS_NUM; } long int lags[SOCKETS_NUM * RUNS]; long int total_lag = 0; long int max_lag = 0; long int big_lag = 0; for ( int i = 0; i < SOCKETS_NUM * RUNS; i++ ) { lags[i] = ( times[i].recv_time.tv_nsec - times[i].send_time.tv_nsec + ( times[i].recv_time.tv_sec - times[i].send_time.tv_sec ) * 1000000000 ) / 1000; // --------------------------------------- [us] if ( lags[i] > max_lag ) max_lag = lags[i]; total_lag += lags[i]; if ( lags[i] > 200 ) big_lag++; } long int avg_lag = total_lag / SOCKETS_NUM / RUNS; double SD = 0.0; for ( int i = 0; i < SOCKETS_NUM * RUNS; ++i ) { SD += pow( (double)( lags[i] - avg_lag ), 2 ); } double std_lag = sqrt( SD / SOCKETS_NUM / RUNS ); printf("avg lag = %l5d [us]\n", avg_lag); printf("max lag = %l5d [us]\n", max_lag); printf("std dev = %8.2f [us]\n", std_lag); printf("big lag = %l5d x above 200 [us]\n", big_lag); for ( int i = 0; i < SOCKETS_NUM; i++ ) { zmq_close( responders[i] ); } zmq_ctx_destroy( context ); return 0; } 将处理最短2 ms的暂停从根据实时策略(例如nanosleepnanosleep())安排的线程调用时的精度。此特殊扩展在内核2.5.39中已删除,因此在当前的2.4内核中仍存在,但在2.6内核中不存在。

© www.soinside.com 2019 - 2024. All rights reserved.