我的程序模拟 grep 命令的行为。也就是说,当执行
./main grep <pattern> <file.txt>
时,它将使用缓冲区逐行搜索并写入找到匹配项的行号以及该行的内容。该程序在没有线程的情况下运行正常,但在有线程的情况下执行时,它会写入在无限循环中找到的匹配项,并且线程计数器会无限增长。
只需要用
gcc -o main main.c
编译程序,然后用./main grep <pattern> <file.txt>
执行,其中pattern就是你要查找的单词或者其中的一部分,看它出现在第几行和第几行,以及文件可以是任何包含信息的.txt文件,例如sample file.
#include "regex.h"
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#define BUFFER_SIZE 100
#define NUM_THREADS 5
typedef struct {
char *pattern;
char *filename;
} Parameter_Handler;
typedef struct {
char buffer[BUFFER_SIZE];
int current_offset;
FILE *file;
int num_line;
int char_count;
regex_t regex;
Parameter_Handler ph;
pthread_mutex_t mutex;
int stop;
} File_Handler;
typedef struct {
int id;
File_Handler *fh;
} Thread_manager;
void init_parameter_handler(Parameter_Handler *ph, char *args[], int n) {
if (n < 4) {
printf("Parameters number is inconsistent");
exit(1);
}
if (strcmp(args[1], "grep")) {
printf("Invalid command %s", args[1]);
exit(1);
}
ph->pattern = args[2];
ph->filename = args[3];
}
void init_file_handler(File_Handler *fh, int argc, char *argv[]) {
fh->current_offset = fh->num_line = fh->char_count = fh->stop = 0;
fh->ph = (Parameter_Handler){"", ""};
init_parameter_handler(&fh->ph, argv, argc);
pthread_mutex_init(&fh->mutex, NULL);
fh->file = fopen(fh->ph.filename, "r");
if (fh->file == NULL) {
printf("Could not open the file\n");
exit(1);
}
}
void check_match(File_Handler *fh) {
int ret = regcomp(&fh->regex, fh->ph.pattern, REG_EXTENDED);
if (ret != 0) {
printf("Failed to compile regex.\n");
exit(1);
}
char *token = strtok(fh->buffer, "\n");
while (token != NULL) {
fh->num_line++;
ret = regexec(&fh->regex, token, 0, NULL, 0);
if (ret == 0)
printf("[%d] %s\n", fh->num_line, token);
token = strtok(NULL, "\n");
}
regfree(&fh->regex);
}
void restore_offset(File_Handler *fh, int offset) {
do {
offset--;
} while (fh->buffer[offset - 1] != '\n');
fh->current_offset += offset;
strrchr(fh->buffer, '\n')[1] = '\0';
fseek(fh->file, fh->current_offset, SEEK_SET);
}
int read_fragment(File_Handler *fh) {
memset(fh->buffer, 0, sizeof(fh->buffer));
int offset = fread(fh->buffer, sizeof(char), BUFFER_SIZE, fh->file);
if (offset == BUFFER_SIZE) {
if (fh->buffer[offset - 1] != '\n') {
restore_offset(fh, offset);
} else
fh->current_offset += offset;
check_match(fh);
return 0;
} else {
check_match(fh);
return 1;
}
}
void *func(void *arg) {
File_Handler *fh = (File_Handler *)arg;
int status;
while (1) {
pthread_mutex_lock(&fh->mutex);
status = read_fragment(fh);
pthread_mutex_unlock(&fh->mutex);
if (status) {
break;
}
}
}
int main(int argc, char *argv[]) {
File_Handler fh;
init_file_handler(&fh, argc, argv);
pthread_t thread[NUM_THREADS];
int t_id[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++) {
pthread_create(&thread[i], NULL, func, (void *)&fh);
}
for (int i = 0; i < NUM_THREADS; i++)
pthread_join(thread[i], NULL);
pthread_mutex_destroy(&fh.mutex);
return 0;
}
我想让线程访问File_Handler,每个线程处理文件的一部分,当文件完成时,所有线程一起生成并显示结果。
一些问题...
File_Handler
实例,而不是 each 线程。File_Handler
结构数组。对于数组,需要 no 互斥锁。strtok
not 线程安全而不是 strtok_r
fread
中对 read_fragment
的使用是半损坏的。最好使用 fgets
[最初]。使用fgets
,无需使用strtok
。下面有更多内容。regcomp
,而不是每个线程只调用一次。File_Handler
的设置方式,all 线程读取all 行。这是重复的努力。read_fragment
和 restore_offset
来缓解这种情况。 [我认为] 你试图让每个线程扫描文件的一个独特片段。这个概念很好,但实现不正确。再次,见下文。这里是 [first pass of] 更正后的代码。它用错误和修复注释:
#include "regex.h"
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#define BUFFER_SIZE 100
#ifndef NUM_THREADS
#define NUM_THREADS 5
#endif
typedef struct {
char *pattern;
char *filename;
} Parameter_Handler;
typedef struct {
char buffer[BUFFER_SIZE];
int current_offset;
FILE *file;
int num_line;
int char_count;
regex_t regex;
Parameter_Handler ph;
pthread_mutex_t mutex;
int stop;
#if 1
int match_count;
#endif
} File_Handler;
typedef struct {
int id;
File_Handler *fh;
} Thread_manager;
#if DEBUG
#define dbgprt(_fmt...) \
printf(_fmt)
#else
#define dbgprt(_fmt...) \
do { } while (0)
#endif
void
init_parameter_handler(Parameter_Handler *ph, char *args[], int n)
{
if (n < 4) {
printf("Parameters number is inconsistent");
exit(1);
}
if (strcmp(args[1], "grep")) {
printf("Invalid command %s", args[1]);
exit(1);
}
ph->pattern = args[2];
ph->filename = args[3];
}
void
init_file_handler(File_Handler *fh, int argc, char *argv[])
{
fh->current_offset = fh->num_line = fh->char_count = fh->stop = 0;
#if 0
// NOTE/BUG: this is extraneous -- all values replaced by init_parameter_handler
fh->ph = (Parameter_Handler) {
"", ""};
#endif
init_parameter_handler(&fh->ph, argv, argc);
#if 0
// NOTE/BUG: not needed with array of File_Handler
pthread_mutex_init(&fh->mutex, NULL);
#endif
fh->file = fopen(fh->ph.filename, "r");
if (fh->file == NULL) {
printf("Could not open the file\n");
exit(1);
}
#if 1
// NOTE/FIX -- this only needs to be done once per thread
int ret = regcomp(&fh->regex, fh->ph.pattern, REG_EXTENDED);
if (ret != 0) {
printf("Failed to compile regex.\n");
exit(1);
}
#endif
}
void
check_match(File_Handler *fh)
{
#if 0
// NOTE/BUG: this is expensive -- it should be moved to init_file_handler
// function
int ret = regcomp(&fh->regex, fh->ph.pattern, REG_EXTENDED);
if (ret != 0) {
printf("Failed to compile regex.\n");
exit(1);
}
#endif
#if 0
// NOTE/BUG: strtok is _not_ thread-safe -- use strtok_r
char *token = strtok(fh->buffer, "\n");
#else
char *save;
char *token = strtok_r(fh->buffer, "\n", &save);
#endif
while (token != NULL) {
fh->num_line++;
int ret = regexec(&fh->regex, token, 0, NULL, 0);
if (ret == 0) {
printf("[%d] %s\n", fh->num_line, token);
fh->match_count += 1;
}
#if 0
token = strtok(NULL, "\n");
#else
token = strtok_r(NULL, "\n", &save);
#endif
}
#if 0
regfree(&fh->regex);
#endif
}
void
restore_offset(File_Handler *fh, int offset)
{
do {
offset--;
} while (fh->buffer[offset - 1] != '\n');
fh->current_offset += offset;
strrchr(fh->buffer, '\n')[1] = '\0';
fseek(fh->file, fh->current_offset, SEEK_SET);
}
int
read_fragment(File_Handler *fh)
{
memset(fh->buffer, 0, sizeof(fh->buffer));
int offset = fread(fh->buffer, sizeof(char), BUFFER_SIZE, fh->file);
if (offset == BUFFER_SIZE) {
if (fh->buffer[offset - 1] != '\n') {
restore_offset(fh, offset);
}
else
fh->current_offset += offset;
check_match(fh);
return 0;
}
else {
check_match(fh);
return 1;
}
}
int
read_line(File_Handler *fh)
{
int eof = 0;
do {
char *cp = fgets(fh->buffer, BUFFER_SIZE, fh->file);
eof = (cp == NULL);
if (eof)
break;
cp = strchr(fh->buffer,'\n');
if (cp != NULL)
*cp = 0;
check_match(fh);
} while (0);
return eof;
}
void *
func(void *arg)
{
#if 0
File_Handler *fh = (File_Handler *) arg;
#else
File_Handler *fh = arg;
#endif
int status;
while (1) {
#if 0
// NOTE/BUG: doing this just masks the problem that only one File_Handler is
// used by all threads
pthread_mutex_lock(&fh->mutex);
#endif
#if 0
// NOTE/BUG: read_fragment hangs
status = read_fragment(fh);
#else
status = read_line(fh);
#endif
#if 0
pthread_mutex_unlock(&fh->mutex);
#endif
if (status) {
break;
}
}
#if 1
return (void *) 0;
#endif
}
int
main(int argc, char *argv[])
{
#if 0
// NOTE/BUG: we need one of these for each thread
File_Handler fh;
init_file_handler(&fh, argc, argv);
#else
File_Handler *fh;
File_Handler fharray[NUM_THREADS];
#endif
pthread_t thread[NUM_THREADS];
#if 0
// NOTE/BUG: this is unused
int t_id[NUM_THREADS];
#endif
for (int i = 0; i < NUM_THREADS; i++) {
#if 0
pthread_create(&thread[i], NULL, func, (void *) &fh);
#else
fh = &fharray[i];
init_file_handler(fh, argc, argv);
pthread_create(&thread[i], NULL, func, fh);
#endif
}
for (int i = 0; i < NUM_THREADS; i++)
pthread_join(thread[i], NULL);
for (int i = 0; i < NUM_THREADS; i++) {
fh = &fharray[i];
regfree(&fh->regex);
printf("%d: %d matches\n",i,fh->match_count);
}
#if 0
pthread_mutex_destroy(&fh.mutex);
#endif
return 0;
}
在上面的代码中,我使用了
cpp
条件来表示旧代码与新代码:
#if 0
// old code
#else
// new code
#endif
#if 1
// new code
#endif
注意:这可以通过运行文件来清理
unifdef -k
做
fread
然后备份找到换行边界的想法是一个合理的想法。
但是,使用
open
一次会好很多。然后,使用mmap
映射整个文件。
有关背景,请参阅我的答案:
在每个线程的设置过程中,块是文件大小除以线程数。并且,每个线程的起始偏移量类似。
这里是一些用于获取段偏移量和长度的伪代码:
// do a stat to get the file size ...
struct stat st;
// map the entire file
char *bigbuffer = mmap(...);
off_t chunksize;
off_t offset = 0;
for (int i = 0; i < NUM_THREADS; ++i) {
fh = &fhlist[i];
// last thread must use the remainder of the file
if (i == (NUM_THREADS - 1)) {
chunksize = st.st_size - offset;
fh->starting_offset = offset;
fh->chunk_size = chunksize;
break;
}
// get the chunk size
chunksize = st.st_size / NUM_Threads;
// back up integral line boundary
char *cp = bigbuffer + offset + chunksize - 1;
for (; cp >= bigbuffer; --cp, --chunksize) {
if (*cp == '\n')
break;
}
// set starting offset and length for this thread
fh->starting_offset = offset;
fh->chunk_size = chunksize;
// get starting offset for _next_ thread
offset += chunksize;
}