所以我必须在 C 中实现一个队列,并使用一个系统来保护它并使其可以通过多个线程访问。 为此,我使用了生产者-消费者逻辑:
#include <stdlib.h>
#include <threads.h>
#include <stdbool.h>
#include "queue.h"
#include <stdio.h>
int metidos=1;
int quitados=1;
// circular array
typedef struct _queue {
int size;
int used;
int first;
void **data;
mtx_t * mutex;
cnd_t * full;
cnd_t * empty;
bool terminado;
} _queue;
void q_terminar(queue q){
printf("Entra en q_terminar\n");
mtx_lock(q->mutex);
q->terminado=true;
mtx_unlock(q->mutex);
cnd_broadcast(q->empty);
}
queue q_create(int size) {
queue q = malloc(sizeof(_queue));
q->size = size;
q->used = 0;
q->first = 0;
q->data = malloc(size * sizeof(void *));
q->mutex = malloc(sizeof (mtx_t));
q->full = malloc(sizeof(cnd_t));
q->empty = malloc(sizeof(cnd_t));
q->terminado=false;
mtx_init(q->mutex, mtx_plain);
cnd_init(q->full);
cnd_init(q->empty);
return q;
}
int q_elements(queue q) {
mtx_lock(q->mutex);
int res= q->used;
mtx_unlock(q->mutex);
return res;
}
int q_insert(queue q, void *elem) {
if(q->terminado==true){
return 1;
}
printf("Entra en insert\n");
mtx_lock(q->mutex);
while(q->used == q->size){
printf("ESperando para insertar\n");
cnd_wait(q->full, q->mutex);
printf("Recibiendo señal para insertar\n");
}
//if(q->size == q->used) return -1;
q->data[(q->first + q->used) % q->size] = elem;
q->used++;
printf("Insertado, este es el elemento %d en ser insertado\n",metidos);
printf("En la cola hay %d elementos\n",q->used);
metidos++;
if(q->used == 1){
cnd_broadcast(q->empty);
printf("Enviando señal para despertar a los que borran\n");
}
mtx_unlock(q->mutex);
return 0;
}
void *q_remove(queue q) {
printf("Entra en remove\n");
void *res;
mtx_lock(q->mutex);
if(q->terminado == true){
mtx_unlock(q->mutex);
return NULL;
}
while(q->used ==0 && q->terminado==false){
printf("Esperando para quitar\n");
cnd_wait(q->empty, q->mutex);
printf("Recibiendo señal para quitar\n");
}
if(q->used == 0) {
mtx_unlock(q->mutex);
return NULL;
}
res = q->data[q->first];
q->first = (q->first + 1) % q->size;
q->used--;
cnd_signal(q->full);
printf("Quitado, este es el elemento %d en ser quitado\n", quitados);
printf("En la cola hay %d elementos\n",q->used);
quitados++;
if(q->used == q->size-1){
cnd_broadcast(q->full);
printf("Enviando señal para despertar a los que insertan\n");
}
mtx_unlock(q->mutex);
return res;
}
void q_destroy(queue q) {
mtx_destroy(q->mutex);
cnd_destroy(q->full);
cnd_destroy(q->empty);
free(q->full);
free(q->empty);
free(q->mutex);
free(q->data);
free(q);
}
现在,在主文件中,我必须在单独的线程中将函数“sum”对 get_entries 的调用分开,所以我创建了一个名为 get_entries_thread 的函数
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <dirent.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <openssl/evp.h>
#include <threads.h>
#include "options.h"
#include "queue.h"
#define MAX_PATH 1024
#define BLOCK_SIZE (10*1024*1024)
#define MAX_LINE_LENGTH (MAX_PATH * 2)
struct file_md5 {
char *file;
unsigned char *hash;
unsigned int hash_size;
};
struct thread_get_entries_args{
int id;
char *dir;
queue q;
};
struct thread_get_entries_info {
thrd_t id;
struct thread_get_entries_args * entries_args;
cnd_t condicion;
};
thrd_t global;
void get_entries(char *dir, queue q);
void print_hash(struct file_md5 *md5) {
for(int i = 0; i < md5->hash_size; i++) {
printf("%02hhx", md5->hash[i]);
}
}
void read_hash_file(char *file, char *dir, queue q) {
FILE *fp;
char line[MAX_LINE_LENGTH];
char *file_name, *hash;
int hash_len;
if((fp = fopen(file, "r")) == NULL) {
printf("Could not open %s : %s\n", file, strerror(errno));
exit(0);
}
while(fgets(line, MAX_LINE_LENGTH, fp) != NULL) {
char *field_break;
struct file_md5 *md5 = malloc(sizeof(struct file_md5));
if((field_break = strstr(line, ": ")) == NULL) {
printf("Malformed md5 file\n");
exit(0);
}
*field_break = '\0';
file_name = line;
hash = field_break + 2;
hash_len = strlen(hash);
md5->file = malloc(strlen(file_name) + strlen(dir) + 2);
sprintf(md5->file, "%s/%s", dir, file_name);
md5->hash = malloc(hash_len / 2);
md5->hash_size = hash_len / 2;
for(int i = 0; i < hash_len; i+=2)
sscanf(hash + i, "%02hhx", &md5->hash[i / 2]);
printf("Se llama a q_insert\n");
q_insert(q, md5);
}
fclose(fp);
}
void sum_file(struct file_md5 *md5) {
EVP_MD_CTX *mdctx;
int nbytes;
FILE *fp;
char *buf;
if((fp = fopen(md5->file, "r")) == NULL) {
printf("Could not open %s\n", md5->file);
return;
}
buf = malloc(BLOCK_SIZE);
const EVP_MD *md = EVP_get_digestbyname("md5");
mdctx = EVP_MD_CTX_create();
EVP_DigestInit_ex(mdctx, md, NULL);
while((nbytes = fread(buf, 1, BLOCK_SIZE, fp)) >0)
EVP_DigestUpdate(mdctx, buf, nbytes);
md5->hash = malloc(EVP_MAX_MD_SIZE);
EVP_DigestFinal_ex(mdctx, md5->hash, &md5->hash_size);
EVP_MD_CTX_destroy(mdctx);
free(buf);
fclose(fp);
}
void recurse(char *entry, void *arg) {
queue q = * (queue *) arg;
struct stat st;
stat(entry, &st);
if(S_ISDIR(st.st_mode))/////
get_entries(entry, q);
}
void add_files(char *entry, void *arg) {
queue q = * (queue *) arg;
struct stat st;
stat(entry, &st);
if(S_ISREG(st.st_mode)) {
printf("Se llama a q_insert\n");
q_insert(q, strdup(entry));
}
}
void walk_dir(char *dir, void (*action)(char *entry, void *arg), void *arg) {
DIR *d;
struct dirent *ent;
char full_path[MAX_PATH];
if((d = opendir(dir)) == NULL) {
printf("Could not open dir %s\n", dir);
return;
}
while((ent = readdir(d)) != NULL) {
if(strcmp(ent->d_name, ".") == 0 || strcmp(ent->d_name, "..") ==0)
continue;
snprintf(full_path, MAX_PATH, "%s/%s", dir, ent->d_name);
action(full_path, arg);
}
closedir(d);
}
void get_entries(char *dir, queue q) {
walk_dir(dir, add_files, &q);
walk_dir(dir, recurse, &q);
}
void check(struct options opt) {
queue in_q;
struct file_md5 *md5_in, md5_file;
in_q = q_create(opt.queue_size);
read_hash_file(opt.file, opt.dir, in_q);
while((md5_in = q_remove(in_q))) {
printf("Se llama a q_remove in en check\n");
md5_file.file = md5_in->file;
sum_file(&md5_file);
if(memcmp(md5_file.hash, md5_in->hash, md5_file.hash_size)!=0) {
printf("File %s doesn't match.\nFound: ", md5_file.file);
print_hash(&md5_file);
printf("\nExpected: ");
print_hash(md5_in);
printf("\n");
}
free(md5_file.hash);
free(md5_in->file);
free(md5_in->hash);
free(md5_in);
}
q_destroy(in_q);
}
int get_entries_cast(void*ptr){
struct thread_get_entries_args * entries_args = ptr;
get_entries(entries_args->dir, entries_args->q);
printf("Llamada a q_terminar\n");
q_terminar(entries_args->q);
return 0;
}
void start_get_entries_thread(char *dir, queue in_q){
struct thread_get_entries_info* thread;
thread = malloc(sizeof(struct thread_get_entries_info));
if(thread == NULL){
printf("Not enough memory available.\n");
exit(1);
}
thread->entries_args = malloc(sizeof(struct thread_get_entries_args));
thread->entries_args->dir=dir;
thread->entries_args->q=in_q;
thread->entries_args->id=0;
if(0!= thrd_create(&thread->id, get_entries_cast, thread->entries_args)){
printf("FALLO AL CREAR\n");
}
global = thread->id;
}
void sum(struct options opt) {
queue in_q, out_q;
char *ent;
FILE *out;
struct file_md5 *md5;
int dirname_len;
in_q = q_create(opt.queue_size);
out_q = q_create(opt.queue_size);
start_get_entries_thread(opt.dir, in_q); //Use thread here instead of calling get_entries
printf("Va a entrar en remove\n");
while((ent = q_remove(in_q)) != NULL) {
md5 = malloc(sizeof(struct file_md5));
md5->file = ent;
sum_file(md5);
printf("Se llama a q_insert\n");
q_insert(out_q, md5);
}
printf("Llamada a q_terminar\n");
q_terminar(out_q);
if((out = fopen(opt.file, "w")) == NULL) {
printf("Could not open output file\n");
exit(0);
}
dirname_len = strlen(opt.dir) + 1; // length of dir + /
while((md5 = q_remove(out_q)) != NULL) {
printf("Se llama a q_remove out\n");
fprintf(out, "%s: ", md5->file + dirname_len);
for(int i = 0; i < md5->hash_size; i++)
fprintf(out, "%02hhx", md5->hash[i]);
fprintf(out, "\n");
free(md5->file);
free(md5->hash);
free(md5);
}
//if(thrd_join(thread->id, NULL)){
//printf("FALLO AL UNIR\n");
//}
fclose(out);
q_destroy(in_q);
q_destroy(out_q);
}
int main(int argc, char *argv[]) {
struct options opt;
opt.num_threads = 5;
opt.queue_size = 1;
opt.check = true;
opt.file = NULL;
opt.dir = NULL;
read_options (argc, argv, &opt);
if(opt.check)
check(opt);
else
sum(opt);
}
所以事情是,起初它有效,但随后它在尝试插入时会无限卡住。而且我现在也不知道是否使用 thrd_join。提前致谢。
Queue 代码对同步对象的使用存在几个弱点。其中:
q_terminar()
只唤醒等待q->empty
CV 的线程,而不是那些等待q->full
CV 的线程。也许您预计只有在队列为空时才会调用此函数,但是将该函数额外广播到 q->full
. 会更安全且成本更低
q_insert()
读取 q->terminado
而无需首先获取队列的互斥量。这通常会造成数据竞争。除了条件变量和互斥锁本身之外,队列结构的任何成员都不应被任何线程访问,而无需锁定互斥锁,或依赖其他一些有效的同步机制。
q->insert()
仅在函数入口处检查一次 q->terminado
。每次从 CV 等待中醒来时,它都应该再次检查,如果发现该成员是真实的,则采取适当的行动。
q_insert()
仅在插入项目后队列大小恰好为 1 时才向 q->empty
CV 广播。这 probably 足够了,但我建议无条件地执行该广播,因为这样更容易推理并确信它在所有情况下都是正确的。
在
q_remove()
中,我建议在if(q->terminado == true)
之后移动while
块并移除if (q->used == 0)
块(控制可以用q->used == 0
到达那个点的唯一方法是如果q->termindo == true
)。目前的代码在这方面没有错误,但多余。如果你愿意,在assert(q->used == 0)
块中添加一个if(q->terminado == true)
(我愿意)。
q_remove()
同时向q->full
CV发出信号并向其广播是多余的。广播可能是要保留的那个。但是,最好无条件地执行该广播,而不是仅在项目删除后的队列大小恰好为q->size - 1
.
另外,
sum()
在没有先加入第二个线程的情况下拆除队列。仅当没有线程将其锁定或在销毁后可以尝试锁定它时,销毁互斥锁才是安全的。销毁 CV 只有在没有线程在等待它,或者在销毁后可以尝试等待、发信号或广播给它时才是安全的。在确认其他线程已终止之前,我不明白您如何确信您的队列销毁在这些方面是安全的,并且加入该线程似乎是获得此类确认的最佳方式。
作为更一般的经验法则,您应该加入或分离除初始线程之外的每个线程。
正如@StephanSchlecht 在评论中观察到的那样,初始线程将项目排入
out_q
,但没有其他线程将这些项目出列。初始线程仅在从 in_q
中出列所有项目后才尝试将它们出列。因此,如果项目多于 out_q
一次可以容纳的数量,那么初始线程最终将填满该队列并阻止尝试添加另一个项目。在那之后,它不会从 in_q
中出列任何项目,可能会导致另一个线程将该队列填满并阻塞。可能的解决方案包括:
out_q
,或者