我尝试使用ZeroMQ进行快速消息传递。邮件需要以小于[[1 [ms]
的形式传递。我做了一些测试(inproc
,Linux上的单个进程,没有TCP),发现通常没有任何问题。延迟大约为10 - 100 [us]
,具体取决于发送消息的频率(为什么?)。但是有时6 [ms]
之后会收到消息,这是不可接受的。
也许该进程被抢占了?
或者是由于使用了轮询(zmq_poll()
)?
示例测试结果:
avg lag = 28 [us]
max lag = 5221 [us]
std dev = 25.85 [us]
big lag = 180 x above 200 [us]
“大滞后”是指等待时间超过200 [us]
的情况数。在我的测试中,发送了500 000条消息,因此值180表示200 [us]
中记录了超过180 / 500000 = 0,036%
的延迟。这是一个很小的数字,但我希望它为零。即使以平均延迟为代价。测试源代码如下:
#include <stdlib.h> #include <math.h> #include <zmq.h> #include <pthread.h> #define SOCKETS_NUM 5 #define RUNS 100000 void *context; int numbers[SOCKETS_NUM]; struct { struct timespec send_time; struct timespec receive_time; } times[SOCKETS_NUM * RUNS], *ptimes; static void * worker_thread(void * dummy) { int * number = dummy; char endpoint[] = "inproc://endpointX"; endpoint[17] = (char)('0' + *number); void * socket = zmq_socket(context, ZMQ_PUSH); zmq_connect(socket, endpoint); struct timespec sleeptime, remtime; int rnd = rand() / 3000; sleeptime.tv_sec = 0; sleeptime.tv_nsec = rnd; nanosleep(&sleeptime, &remtime); clock_gettime(CLOCK_REALTIME, &(ptimes[*number].send_time)); zmq_send(socket, "Hello", 5, 0); zmq_close(socket); return NULL; } static void run_test(zmq_pollitem_t items[]) { pthread_t threads[SOCKETS_NUM]; for (int i = 0; i < SOCKETS_NUM; i++) { pthread_create(&threads[i], NULL, worker_thread, &numbers[i]); } char buffer[10]; int to_receive = SOCKETS_NUM; for (int i = 0; i < SOCKETS_NUM; i++) { int rc = zmq_poll(items, SOCKETS_NUM, -1); for (int j = 0; j < SOCKETS_NUM; j++) { if (items[j].revents & ZMQ_POLLIN) { clock_gettime(CLOCK_REALTIME, &(ptimes[j].receive_time)); zmq_recv(items[j].socket, buffer, 10, 0); } } to_receive -= rc; if (to_receive == 0) break; } for (int i = 0; i < SOCKETS_NUM; i++) { pthread_join(threads[i], NULL); } } int main(void) { context = zmq_ctx_new(); zmq_ctx_set(context, ZMQ_THREAD_SCHED_POLICY, SCHED_FIFO); zmq_ctx_set(context, ZMQ_THREAD_PRIORITY, 99); void * responders[SOCKETS_NUM]; char endpoint[] = "inproc://endpointX"; for (int i = 0; i < SOCKETS_NUM; i++) { responders[i] = zmq_socket(context, ZMQ_PULL); endpoint[17] = (char)('0' + i); zmq_bind(responders[i], endpoint); numbers[i] = i; } time_t tt; time_t t = time(&tt); srand((unsigned int)t); zmq_pollitem_t poll_items[SOCKETS_NUM]; for (int i = 0; i < SOCKETS_NUM; i++) { poll_items[i].socket = responders[i]; poll_items[i].events = ZMQ_POLLIN; } ptimes = times; for (int i = 0; i < RUNS; i++) { run_test(poll_items); ptimes += SOCKETS_NUM; } long int lags[SOCKETS_NUM * RUNS]; long int total_lag = 0; long int max_lag = 0; long int big_lag = 0; for (int i = 0; i < SOCKETS_NUM * RUNS; i++) { lags[i] = (times[i].receive_time.tv_nsec - times[i].send_time.tv_nsec + (times[i].receive_time.tv_sec - times[i].send_time.tv_sec) * 1000000000) / 1000; if (lags[i] > max_lag) max_lag = lags[i]; total_lag += lags[i]; if (lags[i] > 200) big_lag++; } long int avg_lag = total_lag / SOCKETS_NUM / RUNS; double SD = 0.0; for (int i = 0; i < SOCKETS_NUM * RUNS; ++i) { SD += pow((double)(lags[i] - avg_lag), 2); } double std_lag = sqrt(SD / SOCKETS_NUM / RUNS); printf("avg lag = %l5d [us]\n", avg_lag); printf("max lag = %l5d [us]\n", max_lag); printf("std dev = %8.2f [us]\n", std_lag); printf("big lag = %l5d x above 200 [us]\n", big_lag); for (int i = 0; i < SOCKETS_NUM; i++) { zmq_close(responders[i]); } zmq_ctx_destroy(context); return 0; }
很酷,但是很难制造。Q:“ ...我希望它为零。”
首先,欢迎来到零之禅的艺术。
<<
接下来,当您运行超快速的,内存映射的ZeroMQ传输类时,主要重点是"ZeroMQ: Principles in less than Five Seconds"处理的性能调整。在这里,您花费了大量的设置开销和直接终止开销操作来发送
inproc://
-times仅Context()
,所以我想永远不会有与队列管理相关的问题,因为根本不会有任何“堆栈增长”。1](假设我们按原样保留代码),这是性能调整的自然步骤,至少要设置socket-CPU_core ZMQ_AFFINITY的ZeroMQ映射(不要在内核之间跳跃或徘徊) )。有趣的是,如果在[[1E5
5 [B]
套接字设置/终止,而每个都没有在内存上发送多于一次的~ 5E5
-mapped行,可以通过使用PUSH
设置(使用5 [B]
I / O线程配置context
-instance来获得一些帮助(针对那些较大的开销和维护)(争取“实时” “ -ness,使用SOCKETS_NUM
,仅具有一个I / O线程没有太大帮助,是吗?]2]下一个实验级别是重新平衡ZMQ_IO_THREADS
映射(全局SCHED_FIFO
的I / O线程到CPU内核上)和ZMQ_THREAD_AFFINITY_CPU_ADD
的每个插槽设置]映射到context
的I / O线程。拥有足够数量的CPU核,通过使服务一个ZMQ_AFFINITY
实例的多个I / O线程在“同一” CPU上保持“在一起”,可能会带来一些性能/超低延迟优势,然而,在这里,我们进入了一个领域,在这种情况下,如果没有任何体内测试和验证,这种“实时”野心驱动的实验的实际硬件和实际系统的后台工作负载以及仍然是“备用”资源的情况将变得难以预测。
context
参数可能会有所帮助,但是除非纳米级套接字生存期(相当昂贵的一次性“耗材”)使用,否则不要指望有任何突破。4]试图以纳秒级的分辨率进行测量,socket
应该使用更多的“持续时间”,以避免zmq_setsockopt()
注入的调整,避免天文学校正的leap秒注入等
5] CLOCK_MONOTONIC_RAW
策略:我不会这样。使用ntp
阻碍了整个马戏团。我强烈反对在任何[[zmq_poll()
timeout == -1
端达到最高性能,可能是通过在任一端设置1:1 distributed-computing线程来实现的,或者,如果您想挑战修饰,请像您一样拥有5-PULL
-er线程,并将所有入口消息收集在一个单一的,零复制的上油的PUSH/PULL
-er上(更容易轮询,可以使用基于有效负载的索引帮助器,将发送方时间戳记放置在该索引帮助符上),无论如何,阻止轮询程序几乎是挑战任何低延迟软实时玩具的反模式。无论如何,不要犹豫,重构代码并使用性能分析工具更好地了解在哪里“获取” PUSH
-s(我的猜测在上面)
PULL
使用big_lag
进行随机(不是基本的,安全地处于任何控制回路活动之外)睡眠是一种冒险的奢侈,因为在较早的内核中会引起问题:
为了支持需要更精确的暂停的应用程序(例如,为了控制一些时间紧迫的硬件),#include <stdlib.h>
#include <math.h>
#include <zmq.h>
#include <pthread.h>
#define SOCKETS_NUM 5
#define RUNS 100000
void *context;
int numbers[SOCKETS_NUM];
struct {
struct timespec send_time;
struct timespec recv_time;
} times[SOCKETS_NUM * RUNS],
*ptimes;
static void *worker_thread( void *dummy ) { //-------------------------- an ovehead expensive one-shot PUSH-based "Hello"-sender & .close()
int *number = dummy;
char endpoint[] = "inproc://endpointX";
endpoint[17] = (char)( '0' + *number );
int rnd = rand() / 3000;
void *socket = zmq_socket( context, ZMQ_PUSH );
struct timespec remtime,
sleeptime;
sleeptime.tv_sec = 0;
sleeptime.tv_nsec = rnd;
zmq_connect( socket, endpoint );
nanosleep( &sleeptime, &remtime ); // anything betweed < 0 : RAND_MAX/3000 > [ns] ... easily >> 32, as #define RAND_MAX 2147483647 ~ 715 827 [ns]
clock_gettime( CLOCK_REALTIME, &( ptimes[*number].send_time) ); //............................................................................ CLK_set_NEAR_SEND
// any CLOCK re-adjustments may and will skew any non-MONOTONIC_CLOCK
zmq_send( socket, "Hello", 5, 0 );
zmq_close( socket );
return NULL;
}
static void run_test( zmq_pollitem_t items[] ) { //--------------------- zmq_poll()-blocked zmq_recv()-orchestrator ( called ~ 1E5 x !!! resources' nano-use & setup + termination overheads matter )
char buffer[10];
int to_receive = SOCKETS_NUM;
pthread_t threads[SOCKETS_NUM];
for ( int i = 0; i < SOCKETS_NUM; i++ ) { //------------------------ thread-maker ( a per-socket PUSH-er[]-s )
pthread_create( &threads[i], NULL, worker_thread, &numbers[i] );
}
for ( int i = 0; i < SOCKETS_NUM; i++ ) { //------------------------ [SERIAL]-------- [i]-stepping
int rc = zmq_poll( items, SOCKETS_NUM, -1 ); //----------------- INFINITE ??? --- blocks /\/\/\/\/\/\/\/\/\/\/\ --- several may flag ZMQ_POLLIN
for ( int j = 0; j < SOCKETS_NUM; j++ ) { //-------------------- ALL-CHECKED in a loop for an items[j].revents
if ( items[j].revents & ZMQ_POLLIN ) { //------------------- FIND IF IT WAS THIS ONE
clock_gettime( CLOCK_REALTIME, &( ptimes[j].recv_time ) );//...................................................................... CLK_set_NEAR_poll()_POSACK'd R2recv
zmq_recv( items[j].socket, buffer, 10, 0 ); //---------- READ-IN from any POSACK'd by zmq_poll()-er flag(s)
}
}
to_receive -= rc; // ---------------------------------------------------------------------------------------------- SUB rc
if (to_receive == 0) break;
}
for ( int i = 0; i < SOCKETS_NUM; i++ ) { //------------------------ thread-killer
pthread_join( threads[i], NULL );
}
}
int main( void ) {
context = zmq_ctx_new();
zmq_ctx_set( context, ZMQ_THREAD_SCHED_POLICY, SCHED_FIFO );
zmq_ctx_set( context, ZMQ_THREAD_PRIORITY, 99 );
void *responders[SOCKETS_NUM];
char endpoint[] = "inproc://endpointX";
for ( int i = 0; i < SOCKETS_NUM; i++ ) {
responders[i] = zmq_socket( context, ZMQ_PULL ); // ------------ PULL instances into []
endpoint[17] = (char)( '0' + i );
zmq_bind( responders[i], endpoint ); //------------------------- .bind()
numbers[i] = i;
}
time_t tt;
time_t t = time(&tt);
srand( (unsigned int)t );
zmq_pollitem_t poll_items[SOCKETS_NUM];
for ( int i = 0; i < SOCKETS_NUM; i++ ) { //------------------------ zmq_politem_t array[] ---pre-fill---
poll_items[i].socket = responders[i];
poll_items[i].events = ZMQ_POLLIN;
}
ptimes = times;
for ( int i = 0; i < RUNS; i++ ) { //------------------------------- 1E5 RUNs
run_test( poll_items ); // ------------------------------------- RUN TEST
ptimes += SOCKETS_NUM;
}
long int lags[SOCKETS_NUM * RUNS];
long int total_lag = 0;
long int max_lag = 0;
long int big_lag = 0;
for ( int i = 0; i < SOCKETS_NUM * RUNS; i++ ) {
lags[i] = ( times[i].recv_time.tv_nsec
- times[i].send_time.tv_nsec
+ ( times[i].recv_time.tv_sec
- times[i].send_time.tv_sec
) * 1000000000
) / 1000; // --------------------------------------- [us]
if ( lags[i] > max_lag ) max_lag = lags[i];
total_lag += lags[i];
if ( lags[i] > 200 ) big_lag++;
}
long int avg_lag = total_lag / SOCKETS_NUM / RUNS;
double SD = 0.0;
for ( int i = 0; i < SOCKETS_NUM * RUNS; ++i ) {
SD += pow( (double)( lags[i] - avg_lag ), 2 );
}
double std_lag = sqrt( SD / SOCKETS_NUM / RUNS );
printf("avg lag = %l5d [us]\n", avg_lag);
printf("max lag = %l5d [us]\n", max_lag);
printf("std dev = %8.2f [us]\n", std_lag);
printf("big lag = %l5d x above 200 [us]\n", big_lag);
for ( int i = 0; i < SOCKETS_NUM; i++ ) {
zmq_close( responders[i] );
}
zmq_ctx_destroy( context );
return 0;
}
将处理最短2 ms的暂停从根据实时策略(例如nanosleep
或nanosleep()
)安排的线程调用时的精度。此特殊扩展在内核2.5.39中已删除,因此在当前的2.4内核中仍存在,但在2.6内核中不存在。