为什么我的程序在没有线程的情况下运行良好,但在添加线程时却表现异常?

问题描述 投票:0回答:1

我的程序模拟 grep 命令的行为。也就是说,当执行

./main grep <pattern> <file.txt>
时,它将使用缓冲区逐行搜索并写入找到匹配项的行号以及该行的内容。该程序在没有线程的情况下运行正常,但在有线程的情况下执行时,它会写入在无限循环中找到的匹配项,并且线程计数器会无限增长。

只需要用

gcc -o main main.c
编译程序,然后用
./main grep <pattern> <file.txt>
执行,其中pattern就是你要查找的单词或者其中的一部分,看它出现在第几行和第几行,以及文件可以是任何包含信息的.txt文件,例如sample file.

#include "regex.h"
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#define BUFFER_SIZE 100
#define NUM_THREADS 5

typedef struct {
  char *pattern;
  char *filename;
} Parameter_Handler;

typedef struct {
  char buffer[BUFFER_SIZE];
  int current_offset;
  FILE *file;
  int num_line;
  int char_count;
  regex_t regex;
  Parameter_Handler ph;
  pthread_mutex_t mutex;
  int stop;
} File_Handler;

typedef struct {
  int id;
  File_Handler *fh;
} Thread_manager;

void init_parameter_handler(Parameter_Handler *ph, char *args[], int n) {
  if (n < 4) {
    printf("Parameters number is inconsistent");
    exit(1);
  }

  if (strcmp(args[1], "grep")) {
    printf("Invalid command %s", args[1]);
    exit(1);
  }

  ph->pattern = args[2];
  ph->filename = args[3];
}

void init_file_handler(File_Handler *fh, int argc, char *argv[]) {
  fh->current_offset = fh->num_line = fh->char_count = fh->stop = 0;

  fh->ph = (Parameter_Handler){"", ""};

  init_parameter_handler(&fh->ph, argv, argc);

  pthread_mutex_init(&fh->mutex, NULL);

  fh->file = fopen(fh->ph.filename, "r");

  if (fh->file == NULL) {
    printf("Could not open the file\n");
    exit(1);
  }
}

void check_match(File_Handler *fh) {
  int ret = regcomp(&fh->regex, fh->ph.pattern, REG_EXTENDED);

  if (ret != 0) {
    printf("Failed to compile regex.\n");
    exit(1);
  }
  char *token = strtok(fh->buffer, "\n");

  while (token != NULL) {
    fh->num_line++;

    ret = regexec(&fh->regex, token, 0, NULL, 0);

    if (ret == 0)
      printf("[%d] %s\n", fh->num_line, token);

    token = strtok(NULL, "\n");
  }

  regfree(&fh->regex);
}

void restore_offset(File_Handler *fh, int offset) {
  do {
    offset--;
  } while (fh->buffer[offset - 1] != '\n');

  fh->current_offset += offset;
  strrchr(fh->buffer, '\n')[1] = '\0';

  fseek(fh->file, fh->current_offset, SEEK_SET);
}

int read_fragment(File_Handler *fh) {
  memset(fh->buffer, 0, sizeof(fh->buffer));

  int offset = fread(fh->buffer, sizeof(char), BUFFER_SIZE, fh->file);

  if (offset == BUFFER_SIZE) {
    if (fh->buffer[offset - 1] != '\n') {
      restore_offset(fh, offset);
    } else
      fh->current_offset += offset;
    check_match(fh);
    return 0;
  } else {
    check_match(fh);
    return 1;
  }
}

void *func(void *arg) {
  File_Handler *fh = (File_Handler *)arg;

  int status;

  while (1) {
    pthread_mutex_lock(&fh->mutex);

    status = read_fragment(fh);

    pthread_mutex_unlock(&fh->mutex);

    if (status) {
      break;
    }
  }
}

int main(int argc, char *argv[]) {
  File_Handler fh;

  init_file_handler(&fh, argc, argv);

  pthread_t thread[NUM_THREADS];

  int t_id[NUM_THREADS];

  for (int i = 0; i < NUM_THREADS; i++) {
    pthread_create(&thread[i], NULL, func, (void *)&fh);
  }

  for (int i = 0; i < NUM_THREADS; i++)
    pthread_join(thread[i], NULL);

  pthread_mutex_destroy(&fh.mutex);

  return 0;
}

我想让线程访问File_Handler,每个线程处理文件的一部分,当文件完成时,所有线程一起生成并显示结果。

c multithreading mutex
1个回答
0
投票

一些问题...

  1. 你只有 one
    File_Handler
    实例,而不是 each 线程。
  2. 您尝试使用互斥锁来掩盖它,而不是使用
    File_Handler
    结构数组。对于数组,需要 no 互斥锁。
  3. 您使用
    strtok
    not 线程安全而不是
    strtok_r
  4. 您在
    fread
    中对
    read_fragment
    的使用是半损坏的。最好使用
    fgets
    [最初]。使用
    fgets
    ,无需使用
    strtok
    。下面有更多内容。
  5. 使用互斥量,性能并不比单线程好(甚至可能更差)。
  6. 你反复调用
    regcomp
    ,而不是每个线程只调用一次。
  7. 由于
    File_Handler
    的设置方式,all 线程读取all 行。这是重复的努力。
  8. 您尝试使用
    read_fragment
    restore_offset
    来缓解这种情况。 [我认为] 你试图让每个线程扫描文件的一个独特片段。这个概念很好,但实现不正确。再次,见下文。

这里是 [first pass of] 更正后的代码。它用错误和修复注释:

#include "regex.h"
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#define BUFFER_SIZE 100

#ifndef NUM_THREADS
#define NUM_THREADS 5
#endif

typedef struct {
    char *pattern;
    char *filename;
} Parameter_Handler;

typedef struct {
    char buffer[BUFFER_SIZE];
    int current_offset;
    FILE *file;
    int num_line;
    int char_count;
    regex_t regex;
    Parameter_Handler ph;
    pthread_mutex_t mutex;
    int stop;
#if 1
    int match_count;
#endif
} File_Handler;

typedef struct {
    int id;
    File_Handler *fh;
} Thread_manager;

#if DEBUG
#define dbgprt(_fmt...) \
    printf(_fmt)
#else
#define dbgprt(_fmt...) \
    do { } while (0)
#endif

void
init_parameter_handler(Parameter_Handler *ph, char *args[], int n)
{
    if (n < 4) {
        printf("Parameters number is inconsistent");
        exit(1);
    }

    if (strcmp(args[1], "grep")) {
        printf("Invalid command %s", args[1]);
        exit(1);
    }

    ph->pattern = args[2];
    ph->filename = args[3];
}

void
init_file_handler(File_Handler *fh, int argc, char *argv[])
{
    fh->current_offset = fh->num_line = fh->char_count = fh->stop = 0;

#if 0
// NOTE/BUG: this is extraneous -- all values replaced by init_parameter_handler
    fh->ph = (Parameter_Handler) {
    "", ""};
#endif

    init_parameter_handler(&fh->ph, argv, argc);

#if 0
// NOTE/BUG: not needed with array of File_Handler
    pthread_mutex_init(&fh->mutex, NULL);
#endif

    fh->file = fopen(fh->ph.filename, "r");
    if (fh->file == NULL) {
        printf("Could not open the file\n");
        exit(1);
    }

#if 1
// NOTE/FIX -- this only needs to be done once per thread
    int ret = regcomp(&fh->regex, fh->ph.pattern, REG_EXTENDED);
    if (ret != 0) {
        printf("Failed to compile regex.\n");
        exit(1);
    }
#endif
}

void
check_match(File_Handler *fh)
{
#if 0
// NOTE/BUG: this is expensive -- it should be moved to init_file_handler
// function
    int ret = regcomp(&fh->regex, fh->ph.pattern, REG_EXTENDED);
    if (ret != 0) {
        printf("Failed to compile regex.\n");
        exit(1);
    }
#endif

#if 0
// NOTE/BUG: strtok is _not_ thread-safe -- use strtok_r
    char *token = strtok(fh->buffer, "\n");
#else
    char *save;
    char *token = strtok_r(fh->buffer, "\n", &save);
#endif

    while (token != NULL) {
        fh->num_line++;

        int ret = regexec(&fh->regex, token, 0, NULL, 0);
        if (ret == 0) {
            printf("[%d] %s\n", fh->num_line, token);
            fh->match_count += 1;
        }

#if 0
        token = strtok(NULL, "\n");
#else
        token = strtok_r(NULL, "\n", &save);
#endif
    }

#if 0
    regfree(&fh->regex);
#endif
}

void
restore_offset(File_Handler *fh, int offset)
{
    do {
        offset--;
    } while (fh->buffer[offset - 1] != '\n');

    fh->current_offset += offset;
    strrchr(fh->buffer, '\n')[1] = '\0';

    fseek(fh->file, fh->current_offset, SEEK_SET);
}

int
read_fragment(File_Handler *fh)
{
    memset(fh->buffer, 0, sizeof(fh->buffer));

    int offset = fread(fh->buffer, sizeof(char), BUFFER_SIZE, fh->file);

    if (offset == BUFFER_SIZE) {
        if (fh->buffer[offset - 1] != '\n') {
            restore_offset(fh, offset);
        }
        else
            fh->current_offset += offset;
        check_match(fh);
        return 0;
    }
    else {
        check_match(fh);
        return 1;
    }
}

int
read_line(File_Handler *fh)
{
    int eof = 0;

    do {
        char *cp = fgets(fh->buffer, BUFFER_SIZE, fh->file);

        eof = (cp == NULL);
        if (eof)
            break;

        cp = strchr(fh->buffer,'\n');
        if (cp != NULL)
            *cp = 0;

        check_match(fh);
    } while (0);

    return eof;
}

void *
func(void *arg)
{
#if 0
    File_Handler *fh = (File_Handler *) arg;
#else
    File_Handler *fh = arg;
#endif

    int status;

    while (1) {
#if 0
// NOTE/BUG: doing this just masks the problem that only one File_Handler is
// used by all threads
        pthread_mutex_lock(&fh->mutex);
#endif

#if 0
// NOTE/BUG: read_fragment hangs
        status = read_fragment(fh);
#else
        status = read_line(fh);
#endif

#if 0
        pthread_mutex_unlock(&fh->mutex);
#endif

        if (status) {
            break;
        }
    }

#if 1
    return (void *) 0;
#endif
}

int
main(int argc, char *argv[])
{
#if 0
// NOTE/BUG: we need one of these for each thread
    File_Handler fh;
    init_file_handler(&fh, argc, argv);
#else
    File_Handler *fh;
    File_Handler fharray[NUM_THREADS];
#endif

    pthread_t thread[NUM_THREADS];

#if 0
// NOTE/BUG: this is unused
    int t_id[NUM_THREADS];
#endif

    for (int i = 0; i < NUM_THREADS; i++) {
#if 0
        pthread_create(&thread[i], NULL, func, (void *) &fh);
#else
        fh = &fharray[i];
        init_file_handler(fh, argc, argv);
        pthread_create(&thread[i], NULL, func, fh);
#endif
    }

    for (int i = 0; i < NUM_THREADS; i++)
        pthread_join(thread[i], NULL);

    for (int i = 0; i < NUM_THREADS; i++) {
        fh = &fharray[i];
        regfree(&fh->regex);
        printf("%d: %d matches\n",i,fh->match_count);
    }

#if 0
    pthread_mutex_destroy(&fh.mutex);
#endif

    return 0;
}

在上面的代码中,我使用了

cpp
条件来表示旧代码与新代码:

#if 0
// old code
#else
// new code
#endif

#if 1
// new code
#endif

注意:这可以通过运行文件来清理

unifdef -k


fread
然后备份找到换行边界的想法是一个合理的想法。

但是,使用

open
一次会好很多。然后,使用
mmap
映射整个文件。

有关背景,请参阅我的答案:

  1. 以最有效的方式逐行阅读平台特定
  2. 在 c 中将文件中的行向后复制
  3. 与子进程共享内存排序
  4. mmap是如何提高文件读取速度的?

在每个线程的设置过程中,块是文件大小除以线程数。并且,每个线程的起始偏移量类似。

这里是一些用于获取段偏移量和长度的伪代码:

// do a stat to get the file size ...
struct stat st;

// map the entire file
char *bigbuffer = mmap(...);

off_t chunksize;
off_t offset = 0;

for (int i = 0;  i < NUM_THREADS;  ++i) {
    fh = &fhlist[i];

    // last thread must use the remainder of the file
    if (i == (NUM_THREADS - 1)) {
        chunksize = st.st_size - offset;
        fh->starting_offset = offset;
        fh->chunk_size = chunksize;
        break;
    }

    // get the chunk size
    chunksize = st.st_size / NUM_Threads;

    // back up integral line boundary
    char *cp = bigbuffer + offset + chunksize - 1;
    for (;  cp >= bigbuffer;  --cp, --chunksize) {
        if (*cp == '\n')
            break;
    }

    // set starting offset and length for this thread
    fh->starting_offset = offset;
    fh->chunk_size = chunksize;

    // get starting offset for _next_ thread
    offset += chunksize;
}
© www.soinside.com 2019 - 2024. All rights reserved.