为什么在工作交错时TCP写入延迟会更糟?

问题描述 投票:14回答:3

我一直在分析TCP延迟(特别是从一个小消息的用户空间到内核空间的write),以便对write的延迟有一些直觉(承认这可能是特定于上下文的)。我注意到测试之间存在很大的不一致,这对我来说似乎很相似,而且我很想知道差异来自何处。我知道微基准测试可能会有问题,但我仍然觉得我缺少一些基本的理解(因为延迟差异是~10倍)。

设置是我有一个C ++ TCP服务器接受一个客户端连接(来自同一CPU上的另一个进程),并且在与客户端连接时,对write进行20次系统调用,一次发送一个字节。服务器的完整代码将在本文末尾复制。这是每个write使用boost/timer(增加~1 mic的噪音)的时间输出:

$ clang++ -std=c++11 -stdlib=libc++ tcpServerStove.cpp -O3; ./a.out
18 mics
3 mics
3 mics
4 mics
3 mics
3 mics
4 mics
3 mics
5 mics
3 mics
...

我可靠地发现第一个write明显慢于其他write。如果我在计时器中包装10,000个write呼叫,平均每个write为2微秒,但第一个呼叫总是15个以上的话筒。为什么会出现这种“变暖”现象?

相关地,我运行了一个实验,在每个write调用之间我做一些阻止CPU工作(计算一个大的素数)。这导致所有$ clang++ -std=c++11 -stdlib=libc++ tcpServerStove.cpp -O3; ./a.out 20 mics 23 mics 23 mics 30 mics 23 mics 21 mics 21 mics 22 mics 22 mics ... 调用都很慢:

write

鉴于这些结果,我想知道在将字节从用户缓冲区复制到内核缓冲区的过程中是否存在某种批处理。如果多个write调用快速连续发生,它们是否会合并为一个内核中断?

特别是我正在寻找一些关于write将缓冲区从用户空间复制到内核空间需要多长时间的概念。如果有一些合并效应允许平均write在我连续执行10,000时只采用2个麦克风,那么得出结论write延迟是2个麦克风将是不公平的乐观;似乎我的直觉应该是每个write需要20微秒。对于没有内核旁路的最低延迟(一个字节的原始read调用),这看起来非常缓慢。

最后一条数据是,当我在我的计算机(TCP服务器和TCP客户端)上的两个进程之间设置乒乓测试时,我平均每次往返6个麦克风(包括write// Server side C/C++ program to demonstrate Socket programming // #include <iostream> #include <unistd.h> #include <stdio.h> #include <sys/socket.h> #include <stdlib.h> #include <netinet/in.h> #include <netinet/tcp.h> #include <string.h> #include <boost/timer.hpp> #include <unistd.h> // Set up some blocking work. bool isPrime(int n) { if (n < 2) { return false; } for (int i = 2; i < n; i++) { if (n % i == 0) { return false; } } return true; } // Compute the nth largest prime. Takes ~1 sec for n = 10,000 int getPrime(int n) { int numPrimes = 0; int i = 0; while (true) { if (isPrime(i)) { numPrimes++; if (numPrimes >= n) { return i; } } i++; } } int main(int argc, char const *argv[]) { int server_fd, new_socket, valread; struct sockaddr_in address; int opt = 1; int addrlen = sizeof(address); // Create socket for TCP server server_fd = socket(AF_INET, SOCK_STREAM, 0); // Prevent writes from being batched setsockopt(server_fd, SOL_SOCKET, TCP_NODELAY, &opt, sizeof(opt)); setsockopt(server_fd, SOL_SOCKET, TCP_NOPUSH, &opt, sizeof(opt)); setsockopt(server_fd, SOL_SOCKET, SO_SNDBUF, &opt, sizeof(opt)); setsockopt(server_fd, SOL_SOCKET, SO_SNDLOWAT, &opt, sizeof(opt)); address.sin_family = AF_INET; address.sin_addr.s_addr = INADDR_ANY; address.sin_port = htons(8080); bind(server_fd, (struct sockaddr *)&address, sizeof(address)); listen(server_fd, 3); // Accept one client connection new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen); char sendBuffer[1] = {0}; int primes[20] = {0}; // Make 20 sequential writes to kernel buffer. for (int i = 0; i < 20; i++) { sendBuffer[0] = i; boost::timer t; write(new_socket, sendBuffer, 1); printf("%d mics\n", int(1e6 * t.elapsed())); // For some reason, doing some blocking work between the writes // The following work slows down the writes by a factor of 10. // primes[i] = getPrime(10000 + i); } // Print a prime to make sure the compiler doesn't optimize // away the computations. printf("prime: %d\n", primes[8]); } ,以及通过localhost网络移动)。这似乎与上面看到的单个写入的20个麦克风延迟不一致。

TCP服务器的完整代码:

// Server side C/C++ program to demonstrate Socket programming
// #include <iostream>
#include <unistd.h>
#include <stdio.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <string.h>
#include <unistd.h>

int main(int argc, char const *argv[])
{
    int sock, valread;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);

    // We'll be passing uint32's back and forth
    unsigned char recv_buffer[1024] = {0};

    // Create socket for TCP server
    sock = socket(AF_INET, SOCK_STREAM, 0);

    setsockopt(sock, SOL_SOCKET, TCP_NODELAY, &opt, sizeof(opt));

    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(8080);

    // Accept one client connection
    if (connect(sock, (struct sockaddr *)&address, (socklen_t)addrlen) != 0) {
        throw("connect failed");
    }

    read(sock, buffer_pointer, num_left);

    for (int i = 0; i < 10; i++) {
        printf("%d\n", recv_buffer[i]);
    }
}

TCP客户端代码:

TCP_NODELAY

我尝试使用和不使用标志TCP_NOPUSHSO_SNDBUFSO_SNDLOWAT// Server side C/C++ program to demonstrate Socket programming // #include <iostream> #include <unistd.h> #include <stdio.h> #include <sys/socket.h> #include <stdlib.h> #include <netinet/in.h> #include <netinet/tcp.h> #include <string.h> #include <boost/timer.hpp> #include <unistd.h> __inline__ uint64_t rdtsc(void) { uint32_t lo, hi; __asm__ __volatile__ ( "xorl %%eax,%%eax \n cpuid" ::: "%rax", "%rbx", "%rcx", "%rdx"); __asm__ __volatile__ ("rdtsc" : "=a" (lo), "=d" (hi)); return (uint64_t)hi << 32 | lo; } // Big Endian (network order) unsigned int fromBytes(unsigned char b[4]) { return b[3] | b[2]<<8 | b[1]<<16 | b[0]<<24; } void toBytes(unsigned int x, unsigned char (&b)[4]) { b[3] = x; b[2] = x>>8; b[1] = x>>16; b[0] = x>>24; } int main(int argc, char const *argv[]) { int server_fd, new_socket, valread; struct sockaddr_in address; int opt = 1; int addrlen = sizeof(address); unsigned char recv_buffer[4] = {0}; unsigned char send_buffer[4] = {0}; // Create socket for TCP server server_fd = socket(AF_INET, SOCK_STREAM, 0); address.sin_family = AF_INET; address.sin_addr.s_addr = INADDR_ANY; address.sin_port = htons(8080); bind(server_fd, (struct sockaddr *)&address, sizeof(address)); listen(server_fd, 3); // Accept one client connection new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen); printf("Connected with client!\n"); int counter = 0; unsigned int x = 0; auto start = rdtsc(); boost::timer t; int n = 10000; while (counter < n) { valread = read(new_socket, recv_buffer, 4); x = fromBytes(recv_buffer); toBytes(x+1, send_buffer); write(new_socket, send_buffer, 4); ++counter; } printf("%f clock cycles per round trip (rdtsc)\n", (rdtsc() - start) / double(n)); printf("%f mics per round trip (boost timer)\n", 1e6 * t.elapsed() / n); } ,并认为这可能会阻止批处理(但我的理解是这种批处理发生在内核缓冲区和网络之间,而不是在用户缓冲区和内核缓冲区之间) 。

这是乒乓测试的服务器代码:

// #include <iostream>
#include <unistd.h>
#include <stdio.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <string.h>
#include <boost/timer.hpp>
#include <unistd.h>

// Big Endian (network order)
unsigned int fromBytes(unsigned char b[4]) {
    return b[3] | b[2]<<8 | b[1]<<16 | b[0]<<24;
}

void toBytes(unsigned int x, unsigned char (&b)[4]) {
    b[3] = x;
    b[2] = x>>8;
    b[1] = x>>16;
    b[0] = x>>24;
}

int main(int argc, char const *argv[])
{
    int sock, valread;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);

    // We'll be passing uint32's back and forth
    unsigned char recv_buffer[4] = {0};
    unsigned char send_buffer[4] = {0};

    // Create socket for TCP server
    sock = socket(AF_INET, SOCK_STREAM, 0);

    // Set TCP_NODELAY so that writes won't be batched
    setsockopt(sock, SOL_SOCKET, TCP_NODELAY, &opt, sizeof(opt));

    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(8080);

    // Accept one client connection
    if (connect(sock, (struct sockaddr *)&address, (socklen_t)addrlen) != 0) {
        throw("connect failed");
    }

    unsigned int lastReceived = 0;
    while (true) {
        toBytes(++lastReceived, send_buffer);
        write(sock, send_buffer, 4);
        valread = read(sock, recv_buffer, 4);
        lastReceived = fromBytes(recv_buffer);
    }
}

这是乒乓测试的客户端代码:

  int loc[N+1];
int nloc, curloc;
for (nloc = curloc = 0; curloc < N; nloc++) {
    int n = read(sock, recv_buffer + curloc, sizeof recv_buffer-curloc);
    if (n <= 0) {
            break;
    }
    curloc += n;
    loc[nloc] = curloc;
}
int last = 0;
for (int i = 0; i < nloc; i++) {
    printf("%*.*s ", loc[i] - last, loc[i] - last, recv_buffer + last);
    last = loc[i];
}
printf("\n");
c performance tcp linux-kernel
3个回答
3
投票

这里有一些问题。

为了更接近答案,您需要让客户端做两件事:1。接收所有数据。 2.记录每次阅读的大小。我是这样做的:

 a b c d e f g h i j k l m n o p q r s 

并定义N到20(抱歉,成长),并将服务器更改为一次写一个字节a-z。现在,当这打印出类似于:

 a bcde fghi jklm nop qrs 

我们知道服务器正在发送1个字节的数据包;但是当它打印出类似于:

abcdefghijklmnopqrst 
a bcdefgh ijkl mno pqrst 
a b cdefg hij klm nop qrst 
a b c d e f g h i j k l m n o p q r s t 

我们怀疑服务器主要发送4字节数据包。

根本问题是TCP_NODELAY不会执行您怀疑的操作。 Nagle的算法,在存在未确认的已发送数据包时累积输出; TCP_NODELAY控制是否应用此选项。

无论TCP_NODELAY如何,您仍然是STREAM_SOCKET,这意味着N-writes可以合并为一个。插座正在为设备供电,但同时您正在为插座供电。一旦数据包[mbuf,skbuff,...]已经提交给设备,套接字需要在下一个write()s上创建一个新的数据包。一旦设备准备好接收新数据包,套接字就可以提供它,但在此之前,数据包将充当缓冲区。在缓冲模式下,写入非常快,因为所有必要的数据结构都可用[如评论和其他答案中所述]。

您可以通过调整SO_SNDBUF和SO_SNDLOWAT套接字选项来控制此缓冲。请注意,但accept返回的缓冲区不会继承所提供套接字的缓冲区大小。通过将SNDBUF减少到1

输出如下:

/* srv.c */
// Server side C/C++ program to demonstrate Socket programming
// #include <iostream>
#include <unistd.h>
#include <stdio.h>
#include <sys/socket.h>
#include <stdbool.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <string.h>
#include <unistd.h>

#ifndef N
#define N 20
#endif
int nap = 0;
int step = 0;
extern long rdtsc(void);

void xerror(char *f) {
    perror(f);
    exit(1);
}
#define Z(x)   if ((x) == -1) { xerror(#x); }

void sopt(int fd, int opt, int val) {
    Z(setsockopt(fd, SOL_SOCKET, opt, &val, sizeof(val)));
}
int gopt(int fd, int opt) {
    int val;
    socklen_t r = sizeof(val);
    Z(getsockopt(fd, SOL_SOCKET, opt, &val, &r));
    return val;
}

#define POPT(fd, x)  printf("%s %d ", #x, gopt(fd, x))
void popts(char *tag, int fd) {
    printf("%s: ", tag);
    POPT(fd, SO_SNDBUF);
    POPT(fd, SO_SNDLOWAT);
    POPT(fd, TCP_NODELAY);
    POPT(fd, TCP_NOPUSH);
    printf("\n");
}

void stepsock(int fd) {
     switch (step++) {
     case 7:
    step = 2;
     case 6:
         sopt(fd, SO_SNDLOWAT, 1);
     case 5:
         sopt(fd, SO_SNDBUF, 1);
     case 4:
         sopt(fd, TCP_NOPUSH, 1);
     case 3:
         sopt(fd, TCP_NODELAY, 1);
     case 2:
     break;
     }
}

int main(int argc, char const *argv[])
{
    int server_fd, new_socket, valread;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);



    // Create socket for TCP server
    server_fd = socket(AF_INET, SOCK_STREAM, 0);

    popts("original", server_fd);
    // Set TCP_NODELAY so that writes won't be batched
    while ((opt = getopt(argc, argv, "sn:o:")) != -1) {
    switch (opt) {
    case 's': step = ! step; break;
    case 'n': nap = strtol(optarg, NULL, 0); break;
    case 'o':
        for (int i = 0; optarg[i]; i++) {
            switch (optarg[i]) {
            case 't': sopt(server_fd, TCP_NODELAY, 1); break;
            case 'p': sopt(server_fd, TCP_NOPUSH, 0); break;
            case 's': sopt(server_fd, SO_SNDBUF, 1); break;
            case 'l': sopt(server_fd, SO_SNDLOWAT, 1); break;
            default:
                exit(1);
            }
        }
    }
    }
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(8080);

    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) == -1) {
    xerror("bind");
    }
    popts("ready", server_fd);
    while (1) {
        if (listen(server_fd, 3) == -1) {
        xerror("listen");
        }

        // Accept one client connection
        new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen);
        if (new_socket == -1) {
        xerror("accept");
        }
            popts("accepted: ", new_socket);
        sopt(new_socket, SO_SNDBUF, gopt(server_fd, SO_SNDBUF));
        sopt(new_socket, SO_SNDLOWAT, gopt(server_fd, SO_SNDLOWAT));
        if (step) {
                stepsock(new_socket);
            }
        long tick[21];
        tick[0] = rdtsc();
        // Make N sequential writes to kernel buffer.
        for (int i = 0; i < N; i++) {
                char ch = 'a' + i;

        write(new_socket, &ch, 1);
        tick[i+1] = rdtsc();

        // For some reason, doing some blocking work between the writes
        // The following work slows down the writes by a factor of 10.
        if (nap) {
           sleep(nap);
        }
        }
        for (int i = 1; i < N+1; i++) {
        printf("%ld\n", tick[i] - tick[i-1]);
        }
        printf("_\n");

        // Print a prime to make sure the compiler doesn't optimize
        // away the computations.
        close(new_socket);
    }
}

对应从默认开始,然后在后续连接上连续向服务器端添加:TCP_NODELAY,TCP_NOPUSH,SO_SNDBUF(= 1),SO_SNDLOWAT(= 1)。每次迭代都有比以前更平坦的时间差。

您的里程可能会有所不同,这是在MacOS 10.12上;并且我用rdtsc()将程序更改为C ++,因为我有信任问题。

#include <stdio.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <string.h>
#include <unistd.h>

#ifndef N
#define N 20
#endif
int nap = 0;

int main(int argc, char const *argv[])
{
    int sock, valread;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);

    // We'll be passing uint32's back and forth
    unsigned char recv_buffer[1024] = {0};

    // Create socket for TCP server
    sock = socket(AF_INET, SOCK_STREAM, 0);

    // Set TCP_NODELAY so that writes won't be batched
    setsockopt(sock, SOL_SOCKET, TCP_NODELAY, &opt, sizeof(opt));

    while ((opt = getopt(argc,argv,"n:")) != -1) {
        switch (opt) {
        case 'n': nap = strtol(optarg, NULL, 0); break;
        default:
            exit(1);
        }
    }
    opt = 1;
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(8080);

    // Accept one client connection
    if (connect(sock, (struct sockaddr *)&address, (socklen_t)addrlen) != 0) {
        perror("connect failed");
    exit(1);
    }
    if (nap) {
    sleep(nap);
    }
    int loc[N+1];
    int nloc, curloc; 
    for (nloc = curloc = 0; curloc < N; nloc++) {
    int n = read(sock, recv_buffer + curloc, sizeof recv_buffer-curloc);
        if (n <= 0) {
        perror("read");
        break;
    }
    curloc += n;
    loc[nloc] = curloc;
    }
    int last = 0;
    for (int i = 0; i < nloc; i++) {
    int t = loc[i] - last;
    printf("%*.*s ", t, t, recv_buffer + last);
    last = loc[i];
    }
    printf("\n");
    return 0;
}

clnt.c:

.globl _rdtsc
_rdtsc:
    rdtsc
    shl $32, %rdx
    or  %rdx,%rax
    ret

rdtsc.s:

Nagle's algorithm

4
投票

(不是一个答案,但需要比评论更多的空间......)

这听起来像write()或其变体,控制TCP数据包实际发送的时间。

对于第一次写入,当“管道”中没有未经证实的数据时,它将立即发送,这需要一些时间。对于之后不久的后续写入,管道中仍然会有未经证实的数据,因此可以在发送缓冲区中排队少量数据,这样会更快。

在传输中断后,当所有发送都有机会赶上时,管道将准备好再次发送。

您可以使用Wireshark之​​类的东西来确认这一点,以查看实际的TCP数据包 - 这将显示connect请求如何组合在一起。

公平地说,我希望TCP_NODELAY标志能够绕过这一点 - 正如你所说,导致更均匀的时间传播。如果您可以检查TCP数据包,那么它们是否值得查看是否显示PSH标志设置,以强制立即发送。


1
投票

(不确定这是否有帮助,但我没有足够的声誉发表评论)

微观标记是棘手的,特别是对于操作系统调用 - 根据我的经验,在确定数字之前,必须考虑并过滤或测量很少的因素。

其中一些因素是:

  1. 缓存命中/未命中
  2. 多任务抢占
  3. 操作系统在API调用的某些时刻分配内存(内存分配很容易导致微秒的延迟)
  4. 延迟加载(在__asm("nop")调用f.e.期间,某些API可能不会做太多,直到真实数据进入)
  5. 此刻CPU的实际时钟速度(动态时钟缩放,一直发生)
  6. 最近在此核心或相邻核心上执行的命令(例如,重AVX512指令可能会将CPU切换到L2(许可证2)模式,这会减慢时钟速度以避免过热)。
  7. 使用虚拟化,可以在同一物理CPU上运行其他任何东西。

您可以尝试通过在一个循环中重复运行相同的命令来减轻因子1,2,6和7的影响。但是,在您的情况下,这可能意味着您需要一次打开几个套接字并在一个周期内测量每个套接字的第一次写入。这样,进入内核的缓存将在第一次调用时被预热,并且进一步调用将具有“更清洁”的时间。你可以平均一下。

为了帮助5,您可以尝试“预热”CPU时钟 - 在测试之前和测试循环内运行一个长的阻塞周期,但是在该循环中不做任何花哨的事情以避免过热 - 最安全的是调用TCP slow start在那个循环里面。

起初,我没有注意到你只发送1个字节,并认为这可能是由于qazxswpoi。但是你的第二次素数测试也不支持这个。所以,它听起来更像是我列表中的因子1,5或6。

© www.soinside.com 2019 - 2024. All rights reserved.