大量请求后线程池后端出现管道错误

问题描述 投票:0回答:1

我的问题与这篇文章类似,但我听从了建议,但仍然出现堆栈溢出和管道损坏错误在使用 Pthreads 睡眠时在 C 中获取分段错误 我正在修改一个简单的网络服务器以使用线程池作为后端我实际上只修改了两个函数。主要:

int main(int argc, char *argv[])
{
    index = 0;
    pthread_t listener;
    
        if(argc < 2 || atoi(argv[1]) < 2000 || atoi(argv[1]) > 50000)
        {
                fprintf(stderr, "./webserver PORT(2001 ~ 49999) (#_of_threads) (crash_rate(%))\n");
                return 0;
        }

        int i;
        
        // port number
        port = atoi(argv[1]);
        
        // # of worker thread
        if(argc > 2) 
                numThread = atoi(argv[2]);
        else numThread = 1;

        // crash rate
        if(argc > 3) 
                CRASH = atoi(argv[3]);
        if(CRASH > 50) CRASH = 50;
        
  sem_init(&sem_empty, 0, MAX_REQUEST);
  sem_init(&sem_full, 0, 0);

        int clock = 0;
        for( clock; clock < numThread; clock++){
            pthread_create(&thread_pool[clock], NULL, thread_function, NULL);
        }
        printf("[pid %d] CRASH RATE = %d\%\n", getpid(), CRASH);
        pthread_create(&listener, NULL, req_handler, NULL);
        //req_handler();
        pthread_join(listener, NULL);
        return 0;
}

这使用

pthread_create
来调用我的监听器,其中还包含我没有接触的套接字代码:

void * req_handler(void *arg)
{
        //requestCount = 0;
        
        
        int r;
        int test;
        struct sockaddr_in sin;
        struct sockaddr_in peer;
        int peer_len = sizeof(peer);
        
        

        sock = socket(AF_INET, SOCK_STREAM, 0);

        sin.sin_family = AF_INET;
        sin.sin_addr.s_addr = INADDR_ANY;
        sin.sin_port = htons(port);
        r = bind(sock, (struct sockaddr *) &sin, sizeof(sin));
        if(r < 0) {
                perror("Error binding socket:");
                exit(0);
        }

        r = listen(sock, 10);
        if(r < 0) {
                perror("Error listening socket:");
                exit(0);
        }

        printf("HTTP server listening on port %d\n", port);
        
        
        //pthread_create(&listener, NULL, listener_function, NULL);
        //pthread_join(listener, NULL);
        int queClock = 0;
    while (1)
    {
        int s;
                
        s = accept(sock, NULL, NULL);
        if (s < 0) {
        printf("sock error\n"); 
            break;
        }
        sem_wait(&sem_empty);
        //pthread_mutex_lock(&mutex);
        printf("listener in mutex\n");
        request[queClock] = s;
        //pthread_mutex_unlock(&mutex);
        queClock ++;
        printf("listener exit mutex\n counter: %d",queClock);
        if (queClock == 100) queClock = 0; 
        sem_post(&sem_full);
        
    }

        close(sock);
}

我的每个线程池线程的线程函数:

void * thread_function(void *arg){
    pid_t x = syscall(__NR_gettid);
    int temp;
    while(1){
        
        sem_wait(&sem_full);
        pthread_mutex_lock(&mutex);
        printf("pool thread %d in mutex\n",x);
        temp = request[index];
        index++;
        if(index == 100) index = 0;
        pthread_mutex_unlock(&mutex);
        
        printf("pool thread %d after queue\n temp: %d index: %d\n",x,temp,index);

        process(temp);

        printf("pool thread %d after process call\n",x);
        
        sem_post(&sem_empty);
        
        sem_getvalue(&sem_empty, &requestCount);
        printf("sem empty value: %d\n",requestCount);
        
    }
}`

我在后台运行我的服务器,然后在运行多组 100 个客户端请求后,我通常在第 4 或第 5 组请求上收到此错误error 代码总是在文件中的

usleep
调用行崩溃我没有修改我的线程是通过我的线程函数中的process函数调用进入的:

int process(int fd) {
        char buf[4096];
        char *method;
        char *_path;
        char path[4096];
        char *protocol;
        struct stat statbuf;
        char pathbuf[4096];
        char cwd[1024];
        int len;
        struct sockaddr_in peer;
        int peer_len = sizeof(peer);
        FILE *f;
        
        srand(syscall(__NR_gettid) + time(NULL));
        if(CRASH > 0 && rand() % 100 < CRASH) {
                printf("Thread [pid %d, tid %d] terminated!\n", getpid(), gettid());
                close(fd);
                pthread_exit(NULL);
        }

        f = fdopen(fd, "a+");
        printf("after fdopen\n");
        usleep(100000);
        printf("before get peer name\n");
        if(getpeername(fd, (struct sockaddr*) &peer, &peer_len) != -1) {
                printf("[pid %d, tid %d] Received a request from %s:%d\n", getpid(), gettid(), inet_ntoa(peer.sin_addr), (int)ntohs(peer.sin_port));
        }
        printf("after get peer name\n");
        if(f == NULL) {
                printf("fileopen error: %s\n", fd);
                return -1;
        }

        if (!fgets(buf, sizeof(buf), f)) {
                fclose(f);
                return -1;
        }

        if(getpeername(fileno(f), (struct sockaddr*) &peer, &peer_len) != -1) {
                printf("[pid %d, tid %d] (from %s:%d) URL: %s", getpid(), gettid(),inet_ntoa(peer.sin_addr), (int)ntohs(peer.sin_port), buf);
        } else {
                printf("[pid %d, tid %d] URL: %s", getpid(), gettid(), buf);
        }

        method = strtok(buf, " ");
        _path = strtok(NULL, " ");
        protocol = strtok(NULL, "\r");
        if (!method || !_path || !protocol) {
                fclose(f);
                return -1;
        }

        getcwd(cwd, sizeof(cwd));
        sprintf(path, "%s%s", cwd, _path);

        fseek(f, 0, SEEK_CUR); // Force change of stream direction

        if (strcasecmp(method, "GET") != 0) {
                send_error(f, 501, "Not supported", NULL, "Method is not supported.");
                printf("[pid %d, tid %d] Reply: %s", getpid(), gettid(), "Method is not supported.\n");
        } else if (stat(path, &statbuf) < 0) {
                send_error(f, 404, "Not Found", NULL, "File not found.");
                printf("[pid %d, tid %d] Reply: File not found - %s", getpid(), gettid(), path);
        } else if (S_ISDIR(statbuf.st_mode)) {
                len = strlen(path);
                if (len == 0 || path[len - 1] != '/') {
                        snprintf(pathbuf, sizeof(pathbuf), "Location: %s/", path);
                        send_error(f, 302, "Found", pathbuf, "Directories must end with a slash.");
                        printf("[pid %d, tid %d] Reply: %s", getpid(), gettid(), "Directories mush end with a slash.\n");
                } else {
                        snprintf(pathbuf, sizeof(pathbuf), "%s%sindex.html",cwd, path);
                        if (stat(pathbuf, &statbuf) >= 0) {
                                send_file(f, pathbuf, &statbuf);
                                printf("[pid %d, tid %d] Reply: filesend %s\n", getpid(), gettid(), pathbuf);
                        } else {
                                DIR *dir;
                                struct dirent *de;

                                send_headers(f, 200, "OK", NULL, "text/html", -1, statbuf.st_mtime);
                                fprintf(f, "<HTML><HEAD><TITLE>Index of %s</TITLE></HEAD>\r\n<BODY>", path);
                                fprintf(f, "<H4>Index of %s</H4>\r\n<PRE>\n", path);
                                fprintf(f, "Name                             Last Modified              Size\r\n");
                                fprintf(f, "<HR>\r\n");
                                if (len > 1) fprintf(f, "<A HREF=\"..\">..</A>\r\n");

                                dir = opendir(path);
                                while ((de = readdir(dir)) != NULL) {
                                        char timebuf[32];
                                        struct tm *tm;

                                        strcpy(pathbuf, path);
                                        strcat(pathbuf, de->d_name);

                                        stat(pathbuf, &statbuf);
                                        tm = gmtime(&statbuf.st_mtime);
                                        strftime(timebuf, sizeof(timebuf), "%d-%b-%Y %H:%M:%S", tm);

                                        fprintf(f, "<A HREF=\"%s%s\">", de->d_name, S_ISDIR(statbuf.st_mode) ? "/" : "");
                                        fprintf(f, "%s%s", de->d_name, S_ISDIR(statbuf.st_mode) ? "/</A>" : "</A> ");
                                        if (strlen(de->d_name) < 32) fprintf(f, "%*s", 32 - strlen(de->d_name), "");
                                        if (S_ISDIR(statbuf.st_mode)) {
                                                fprintf(f, "%s\r\n", timebuf);
                                        } else {
                                                fprintf(f, "%s %10d\r\n", timebuf, statbuf.st_size);
                                        }
                                }
                                closedir(dir);

                                fprintf(f, "</PRE>\r\n<HR>\r\n<ADDRESS>%s</ADDRESS>\r\n</BODY></HTML>\r\n", SERVER);
                                printf("[pid %d, tid %d] Reply: SUCCEED\n", getpid(), gettid());
                        }
                }
        } else {
                send_file(f, path, &statbuf);
                printf("[pid %d, tid %d] Reply: filesend %s\n", getpid(), gettid(), path);
        }
        
        fclose(f);
        return 0;
}

然后调用另一个函数,该函数具有导致管道损坏错误的写入调用。

void send_file(FILE *f, char *path, struct stat *statbuf) {
        char data[4096];
        int n;

        FILE *file = fopen(path, "r");
        if (!file) {
                send_error(f, 403, "Forbidden", NULL, "Access denied.");
        } else {
                int length = S_ISREG(statbuf->st_mode) ? statbuf->st_size : -1;
                send_headers(f, 200, "OK", NULL, get_mime_type(path), length, statbuf->st_mtime);

                while ((n = fread(data, 1, sizeof(data), file)) > 0) fwrite(data, 1, n, f);
                fclose(file);
        }
}

我只是用信号量控制我的生产者线程的事实有问题吗?

#ifndef __WEBSERVER
#define __WEBSERVER

#define NDEBUG

#ifdef NDEBUG
#define debug(M, ...)
#else
 __LINE__, ##__VA_ARGS__)
#endif
extern int CRASH;
int process(int fd);
int gettid();

#endif

添加头文件以获取额外信息。

我很确定我没有传递任何未初始化的东西,因为它大部分时间都有效我做错了或者给我一些关于在无限循环后台进程上使用 valgrind 的提示这将是一个很大的帮助它总是发生在 1/3 到 1/2 的方式通过一批请求所以我不认为它像 Socket : send() 函数返回 'Broken Pipe' 错误

==29486==
==29486== Process terminating with default action of signal 13 (SIGPIPE)
==29486==    at 0x5142BBD: ??? (in /usr/lib64/libc-2.17.so)
==29486==    by 0x50CD2F2: _IO_file_write@@GLIBC_2.2.5 (in /usr/lib64/libc-2.17.so)
==29486==    by 0x50CEB0D: _IO_do_write@@GLIBC_2.2.5 (in /usr/lib64/libc-2.17.so)
==29486==    by 0x50CDA4F: _IO_file_xsputn@@GLIBC_2.2.5 (in /usr/lib64/libc-2.17.so)
==29486==    by 0x50C27E1: fwrite (in /usr/lib64/libc-2.17.so)
==29486==    by 0x401E4B: send_file (net.c:78)
==29486==    by 0x402732: process (net.c:197)
==29486==    by 0x401589: thread_function (webserver.c:37)
==29486==    by 0x4E3EEA4: start_thread (in /usr/lib64/libpthread-2.17.so)
==29486==    by 0x5151B0C: clone (in /usr/lib64/libc-2.17.so)
[pid 29486, tid 65] Received a request from 127.0.0.1:35764
==29486==
==29486== HEAP SUMMARY:
==29486==     in use at exit: 12,408 bytes in 22 blocks
==29486==   total heap usage: 170 allocs, 148 frees, 94,882 bytes allocated
==29486==
==29486== 560 bytes in 1 blocks are possibly lost in loss record 1 of 4
==29486==    at 0x4C2C089: calloc (vg_replace_malloc.c:762)
==29486==    by 0x4012784: _dl_allocate_tls (in /usr/lib64/ld-2.17.so)
==29486==    by 0x4E3F87B: pthread_create@@GLIBC_2.2.5 (in /usr/lib64/libpthread-2.17.so)
==29486==    by 0x4018C0: main (webserver.c:146)
==29486==
==29486== 5,600 bytes in 10 blocks are possibly lost in loss record 3 of 4
==29486==    at 0x4C2C089: calloc (vg_replace_malloc.c:762)
==29486==    by 0x4012784: _dl_allocate_tls (in /usr/lib64/ld-2.17.so)
==29486==    by 0x4E3F87B: pthread_create@@GLIBC_2.2.5 (in /usr/lib64/libpthread-2.17.so)
==29486==    by 0x401878: main (webserver.c:143)
==29486==
==29486== LEAK SUMMARY:
==29486==    definitely lost: 0 bytes in 0 blocks
==29486==    indirectly lost: 0 bytes in 0 blocks
==29486==      possibly lost: 6,160 bytes in 11 blocks
==29486==    still reachable: 6,248 bytes in 11 blocks
==29486==         suppressed: 0 bytes in 0 blocks
==29486== Reachable blocks (those to which a pointer was found) are not shown.
==29486== To see them, rerun with: --leak-check=full --show-leak-kinds=all
==29486==
==29486== For lists of detected and suppressed errors, rerun with: -s
==29486== ERROR SUMMARY: 2 errors from 2 contexts (suppressed: 0 from 0)

Segmentation fault (core dumped)
[mzy22580@csci-odin project2]$  counter: 55before get peer name
[pid 12112, tid 27] Received a request from 127.0.0.1:36512
after get peer name
[pid 12112, tid 27] (from 127.0.0.1:36512) URL: GET /index.html HTTP/1.0
==12112==
==12112== Process terminating with default action of signal 13 (SIGPIPE)
==12112==    at 0x5142BBD: ??? (in /usr/lib64/libc-2.17.so)
==12112==    by 0x50CD2F2: _IO_file_write@@GLIBC_2.2.5 (in /usr/lib64/libc-2.17.so)
==12112==    by 0x50CEB0D: _IO_do_write@@GLIBC_2.2.5 (in /usr/lib64/libc-2.17.so)
==12112==    by 0x50CDA4F: _IO_file_xsputn@@GLIBC_2.2.5 (in /usr/lib64/libc-2.17.so)
==12112==    by 0x50C27E1: fwrite (in /usr/lib64/libc-2.17.so)
==12112==    by 0x401E4B: send_file (net.c:78)
==12112==    by 0x402732: process (net.c:197)
==12112==    by 0x401589: thread_function (webserver.c:37)
==12112==    by 0x4E3EEA4: start_thread (in /usr/lib64/libpthread-2.17.so)
==12112==    by 0x5151B0C: clone (in /usr/lib64/libc-2.17.so)
==12112==
==12112== HEAP SUMMARY:
==12112==     in use at exit: 12,408 bytes in 22 blocks
==12112==   total heap usage: 298 allocs, 276 frees, 167,586 bytes allocated
==12112==
==12112== 560 bytes in 1 blocks are possibly lost in loss record 1 of 4
==12112==    at 0x4C2C089: calloc (vg_replace_malloc.c:762)
==12112==    by 0x4012784: _dl_allocate_tls (in /usr/lib64/ld-2.17.so)
==12112==    by 0x4E3F87B: pthread_create@@GLIBC_2.2.5 (in /usr/lib64/libpthread-2.17.so)
==12112==    by 0x4018C0: main (webserver.c:146)
==12112==
==12112== 568 bytes in 1 blocks are still reachable in loss record 2 of 4
==12112==    at 0x4C29F73: malloc (vg_replace_malloc.c:309)
==12112==    by 0x50C1C4C: __fopen_internal (in /usr/lib64/libc-2.17.so)
==12112==    by 0x401D81: send_file (net.c:71)
==12112==    by 0x402732: process (net.c:197)
==12112==    by 0x401589: thread_function (webserver.c:37)
==12112==    by 0x4E3EEA4: start_thread (in /usr/lib64/libpthread-2.17.so)
==12112==    by 0x5151B0C: clone (in /usr/lib64/libc-2.17.so)
==12112==
==12112== 5,600 bytes in 10 blocks are possibly lost in loss record 3 of 4
==12112==    at 0x4C2C089: calloc (vg_replace_malloc.c:762)
==12112==    by 0x4012784: _dl_allocate_tls (in /usr/lib64/ld-2.17.so)
==12112==    by 0x4E3F87B: pthread_create@@GLIBC_2.2.5 (in /usr/lib64/libpthread-2.17.so)
==12112==    by 0x401878: main (webserver.c:143)
==12112==
==12112== 5,680 bytes in 10 blocks are still reachable in loss record 4 of 4
==12112==    at 0x4C29F73: malloc (vg_replace_malloc.c:309)
==12112==    by 0x50C14C4: fdopen@@GLIBC_2.2.5 (in /usr/lib64/libc-2.17.so)
==12112==    by 0x401F6E: process (net.c:104)
==12112==    by 0x401589: thread_function (webserver.c:37)
==12112==    by 0x4E3EEA4: start_thread (in /usr/lib64/libpthread-2.17.so)
==12112==    by 0x5151B0C: clone (in /usr/lib64/libc-2.17.so)
==12112==
==12112== LEAK SUMMARY:
==12112==    definitely lost: 0 bytes in 0 blocks
==12112==    indirectly lost: 0 bytes in 0 blocks
==12112==      possibly lost: 6,160 bytes in 11 blocks
==12112==    still reachable: 6,248 bytes in 11 blocks
==12112==         suppressed: 0 bytes in 0 blocks
==12112==
==12112== ERROR SUMMARY: 2 errors from 2 contexts (suppressed: 0 from 0)
[mzy22580@csci-odin project2]$
c pthreads
1个回答
0
投票

这个问题似乎与线程或信号量和互斥锁的使用没有直接关系。

代码相当可靠,但它确实有一些弱点,使其不如预期的健壮。其中一些是在错误处理方面,这就是服务器实际上崩溃的地方。与您的

SIGPIPE
关联的堆栈跟踪显示它是由您的
fwrite()
函数中的
sendfile()
调用触发的。由于您正在写入套接字,因此这应该被解释为表明在执行
fwrite()
.

之前或期间关闭了与远程对等方的连接。

这正是 Socket: send() 函数返回的 'Broken Pipe' 错误所描述的。在这一点上,我看不出为什么它在一批请求中途发生应该反驳这种解释。

目前还不清楚为什么在您的特定情况下会发生这种情况,

* 但有一种可能性是没有网络服务器可以安全地忽略的。请参阅How to prevent SIGPIPEs (or handle them properly) 以获取有关防止 SIGPIPE

 杀死您的程序的信息,但还要注意,一旦远程对等方关闭套接字,就没有必要继续尝试向其发送数据.每个后续 
fwrite()
 都应该失败,但代码不会查找或处理(也不会因其他原因发生写入失败)。


*推测:客户端请求会超时,有时写入之间的时间足够长,导致客户端中止请求。使服务器多线程增加了在服务器处理请求时发生此类故障的可能性,而不是在它开始之前。请求处理开始之前的客户端中止可能会出现不同的表现。

© www.soinside.com 2019 - 2024. All rights reserved.