C 互斥量被无限阻塞

问题描述 投票:0回答:1

所以我必须在 C 中实现一个队列,并使用一个系统来保护它并使其可以通过多个线程访问。 为此,我使用了生产者-消费者逻辑:

#include <stdlib.h>
#include <threads.h>
#include <stdbool.h>
#include "queue.h"
#include <stdio.h>

int metidos=1;
int quitados=1;


// circular array
typedef struct _queue {
    int size;
    int used;
    int first;
    void **data;
    mtx_t * mutex;
    cnd_t * full;
    cnd_t * empty;
    bool terminado;
} _queue;

void q_terminar(queue q){
    printf("Entra en q_terminar\n");
    mtx_lock(q->mutex);
    q->terminado=true;
    mtx_unlock(q->mutex);
    cnd_broadcast(q->empty);
}

queue q_create(int size) {
    queue q = malloc(sizeof(_queue));

    q->size  = size;
    q->used  = 0;
    q->first = 0;
    q->data  = malloc(size * sizeof(void *));
    q->mutex = malloc(sizeof (mtx_t));
    q->full = malloc(sizeof(cnd_t));
    q->empty = malloc(sizeof(cnd_t));
    q->terminado=false;
    mtx_init(q->mutex, mtx_plain);
    cnd_init(q->full);
    cnd_init(q->empty);

    return q;
}

int q_elements(queue q) {
    mtx_lock(q->mutex);
    int res= q->used;
    mtx_unlock(q->mutex);
    return res;
}

int q_insert(queue q, void *elem) {
    if(q->terminado==true){
        return 1;
    }
    printf("Entra en insert\n");
    mtx_lock(q->mutex);
    while(q->used == q->size){
        printf("ESperando para insertar\n");
        cnd_wait(q->full, q->mutex);
        printf("Recibiendo señal para insertar\n");
    }
    //if(q->size == q->used) return -1;
    q->data[(q->first + q->used) % q->size] = elem;
    q->used++;

    printf("Insertado, este es el elemento %d en ser insertado\n",metidos);
    printf("En la cola hay %d elementos\n",q->used);
    metidos++;
    if(q->used == 1){
        cnd_broadcast(q->empty);
        printf("Enviando señal para despertar a los que borran\n");
    }

    mtx_unlock(q->mutex);

    return 0;
}

void *q_remove(queue q) {
    printf("Entra en remove\n");
    void *res;
    mtx_lock(q->mutex);
    if(q->terminado == true){
        mtx_unlock(q->mutex);
        return NULL;
    }
    while(q->used ==0 && q->terminado==false){
        printf("Esperando para quitar\n");
        cnd_wait(q->empty, q->mutex);
        printf("Recibiendo señal para quitar\n");
    }

    if(q->used == 0) {
        mtx_unlock(q->mutex);
        return NULL;
    }
    res = q->data[q->first];
    q->first = (q->first + 1) % q->size;
    q->used--;
    cnd_signal(q->full);
    printf("Quitado, este es el elemento %d en ser quitado\n", quitados);
    printf("En la cola hay %d elementos\n",q->used);
    quitados++;
    if(q->used == q->size-1){
        cnd_broadcast(q->full);
        printf("Enviando señal para despertar a los que insertan\n");
    }

    mtx_unlock(q->mutex);
    return res;
}

void q_destroy(queue q) {
    mtx_destroy(q->mutex);
    cnd_destroy(q->full);
    cnd_destroy(q->empty);
    free(q->full);
    free(q->empty);
    free(q->mutex);
    free(q->data);
    free(q);
}

现在,在主文件中,我必须在单独的线程中将函数“sum”对 get_entries 的调用分开,所以我创建了一个名为 get_entries_thread 的函数

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <dirent.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <openssl/evp.h>
#include <threads.h>

#include "options.h"
#include "queue.h"


#define MAX_PATH 1024
#define BLOCK_SIZE (10*1024*1024)
#define MAX_LINE_LENGTH (MAX_PATH * 2)


struct file_md5 {
    char *file;
    unsigned char *hash;
    unsigned int hash_size;
};

struct thread_get_entries_args{
    int id;
    char *dir;
    queue q;
};

struct thread_get_entries_info {
    thrd_t id;
    struct thread_get_entries_args * entries_args;
    cnd_t condicion;
};

thrd_t global;


void get_entries(char *dir, queue q);


void print_hash(struct file_md5 *md5) {
    for(int i = 0; i < md5->hash_size; i++) {
        printf("%02hhx", md5->hash[i]);
    }
}


void read_hash_file(char *file, char *dir, queue q) {
    FILE *fp;
    char line[MAX_LINE_LENGTH];
    char *file_name, *hash;
    int hash_len;

    if((fp = fopen(file, "r")) == NULL) {
        printf("Could not open %s : %s\n", file, strerror(errno));
        exit(0);
    }

    while(fgets(line, MAX_LINE_LENGTH, fp) != NULL) {
        char *field_break;
        struct file_md5 *md5 = malloc(sizeof(struct file_md5));

        if((field_break = strstr(line, ": ")) == NULL) {
            printf("Malformed md5 file\n");
            exit(0);
        }
        *field_break = '\0';

        file_name = line;
        hash      = field_break + 2;
        hash_len  = strlen(hash);

        md5->file      = malloc(strlen(file_name) + strlen(dir) + 2);
        sprintf(md5->file, "%s/%s", dir, file_name);
        md5->hash      = malloc(hash_len / 2);
        md5->hash_size = hash_len / 2;


        for(int i = 0; i < hash_len; i+=2)
            sscanf(hash + i, "%02hhx", &md5->hash[i / 2]);
        printf("Se llama a q_insert\n");
        q_insert(q, md5);
    }

    fclose(fp);
}


void sum_file(struct file_md5 *md5) {
    EVP_MD_CTX *mdctx;
    int nbytes;
    FILE *fp;
    char *buf;

    if((fp = fopen(md5->file, "r")) == NULL) {
        printf("Could not open %s\n", md5->file);
        return;
    }

    buf = malloc(BLOCK_SIZE);
    const EVP_MD *md = EVP_get_digestbyname("md5");

    mdctx = EVP_MD_CTX_create();
    EVP_DigestInit_ex(mdctx, md, NULL);

    while((nbytes = fread(buf, 1, BLOCK_SIZE, fp)) >0)
        EVP_DigestUpdate(mdctx, buf, nbytes);

    md5->hash = malloc(EVP_MAX_MD_SIZE);
    EVP_DigestFinal_ex(mdctx, md5->hash, &md5->hash_size);

    EVP_MD_CTX_destroy(mdctx);
    free(buf);
    fclose(fp);
}


void recurse(char *entry, void *arg) {
    queue q = * (queue *) arg;
    struct stat st;

    stat(entry, &st);

    if(S_ISDIR(st.st_mode))/////

        get_entries(entry, q);
}


void add_files(char *entry, void *arg) {
    queue q = * (queue *) arg;
    struct stat st;

    stat(entry, &st);

    if(S_ISREG(st.st_mode)) {
        printf("Se llama a q_insert\n");
        q_insert(q, strdup(entry));

    }
}


void walk_dir(char *dir, void (*action)(char *entry, void *arg), void *arg) {
    DIR *d;
    struct dirent *ent;
    char full_path[MAX_PATH];

    if((d = opendir(dir)) == NULL) {
        printf("Could not open dir %s\n", dir);
        return;
    }

    while((ent = readdir(d)) != NULL) {
        if(strcmp(ent->d_name, ".") == 0 || strcmp(ent->d_name, "..") ==0)
            continue;

        snprintf(full_path, MAX_PATH, "%s/%s", dir, ent->d_name);

        action(full_path, arg);
    }

    closedir(d);
}


void get_entries(char *dir, queue q) {
    walk_dir(dir, add_files, &q);
    walk_dir(dir, recurse, &q);
}


void check(struct options opt) {
    queue in_q;
    struct file_md5 *md5_in, md5_file;

    in_q  = q_create(opt.queue_size);

    read_hash_file(opt.file, opt.dir, in_q);

    while((md5_in = q_remove(in_q))) {
        printf("Se llama a q_remove in en check\n");
        md5_file.file = md5_in->file;

        sum_file(&md5_file);

        if(memcmp(md5_file.hash, md5_in->hash, md5_file.hash_size)!=0) {
            printf("File %s doesn't match.\nFound:    ", md5_file.file);
            print_hash(&md5_file);
            printf("\nExpected: ");
            print_hash(md5_in);
            printf("\n");
        }

        free(md5_file.hash);

        free(md5_in->file);
        free(md5_in->hash);
        free(md5_in);
    }

    q_destroy(in_q);
}

int get_entries_cast(void*ptr){
    struct thread_get_entries_args * entries_args = ptr;
    get_entries(entries_args->dir, entries_args->q);
    printf("Llamada a q_terminar\n");
    q_terminar(entries_args->q);
    return 0;
}

void start_get_entries_thread(char *dir, queue in_q){
    struct thread_get_entries_info* thread;
    thread = malloc(sizeof(struct thread_get_entries_info));
    if(thread == NULL){
        printf("Not enough memory available.\n");
        exit(1);
    }

    thread->entries_args = malloc(sizeof(struct thread_get_entries_args));
    thread->entries_args->dir=dir;
    thread->entries_args->q=in_q;
    thread->entries_args->id=0;
    if(0!= thrd_create(&thread->id, get_entries_cast, thread->entries_args)){
        printf("FALLO AL CREAR\n");
    }
    global = thread->id;
}


void sum(struct options opt) {
    queue in_q, out_q;
    char *ent;
    FILE *out;
    struct file_md5 *md5;
    int dirname_len;


    in_q  = q_create(opt.queue_size);
    out_q = q_create(opt.queue_size);
    start_get_entries_thread(opt.dir, in_q); //Use thread here instead of calling get_entries
    printf("Va a entrar en remove\n");
    while((ent = q_remove(in_q)) != NULL) {
        md5 = malloc(sizeof(struct file_md5));

        md5->file = ent;
        sum_file(md5);
        printf("Se llama a q_insert\n");
        q_insert(out_q, md5);
    }
    printf("Llamada a q_terminar\n");
    q_terminar(out_q);

    if((out = fopen(opt.file, "w")) == NULL) {
        printf("Could not open output file\n");
        exit(0);
    }

    dirname_len = strlen(opt.dir) + 1; // length of dir + /

    while((md5 = q_remove(out_q)) != NULL) {
        printf("Se llama a q_remove out\n");
        fprintf(out, "%s: ", md5->file + dirname_len);

        for(int i = 0; i < md5->hash_size; i++)
            fprintf(out, "%02hhx", md5->hash[i]);
        fprintf(out, "\n");

        free(md5->file);
        free(md5->hash);
        free(md5);
    }
    //if(thrd_join(thread->id, NULL)){
      //printf("FALLO AL UNIR\n");
    //}
    fclose(out);
    q_destroy(in_q);
    q_destroy(out_q);
}


int main(int argc, char *argv[]) {
    struct options opt;

    opt.num_threads = 5;
    opt.queue_size  = 1;
    opt.check       = true;
    opt.file        = NULL;
    opt.dir         = NULL;

    read_options (argc, argv, &opt);

    if(opt.check)
        check(opt);
    else
        sum(opt);

}

所以事情是,起初它有效,但随后它在尝试插入时会无限卡住。而且我现在也不知道是否使用 thrd_join。提前致谢。

c multithreading mutex wait blocked
1个回答
1
投票

Queue 代码对同步对象的使用存在几个弱点。其中:

  • q_terminar()
    只唤醒等待
    q->empty
    CV 的线程,而不是那些等待
    q->full
    CV 的线程。也许您预计只有在队列为空时才会调用此函数,但是将该函数额外广播到
    q->full
    .

    会更安全且成本更低
  • q_insert()
    读取
    q->terminado
    而无需首先获取队列的互斥量。这通常会造成数据竞争。除了条件变量和互斥锁本身之外,队列结构的任何成员都不应被任何线程访问,而无需锁定互斥锁,或依赖其他一些有效的同步机制。

  • q->insert()
    仅在函数入口处检查一次
    q->terminado
    。每次从 CV 等待中醒来时,它都应该再次检查,如果发现该成员是真实的,则采取适当的行动。

  • q_insert()
    仅在插入项目后队列大小恰好为 1 时才向
    q->empty
    CV 广播。这 probably 足够了,但我建议无条件地执行该广播,因为这样更容易推理并确信它在所有情况下都是正确的。

  • q_remove()
    中,我建议在
    if(q->terminado == true)
    之后移动
    while
    块并移除
    if (q->used == 0)
    块(控制可以用
    q->used == 0
    到达那个点的唯一方法是如果
    q->termindo == true
    )。目前的代码在这方面没有错误,但多余。如果你愿意,在
    assert(q->used == 0)
    块中添加一个
    if(q->terminado == true)
    (我愿意)。

  • q_remove()
    同时向
    q->full
    CV发出信号并向其广播是多余的。广播可能是要保留的那个。但是,最好无条件地执行该广播,而不是仅在项目删除后的队列大小恰好为
    q->size - 1
    .

另外,

  • sum()
    在没有先加入第二个线程的情况下拆除队列。仅当没有线程将其锁定或在销毁后可以尝试锁定它时,销毁互斥锁才是安全的。销毁 CV 只有在没有线程在等待它,或者在销毁后可以尝试等待、发信号或广播给它时才是安全的。在确认其他线程已终止之前,我不明白您如何确信您的队列销毁在这些方面是安全的,并且加入该线程似乎是获得此类确认的最佳方式。

  • 作为更一般的经验法则,您应该加入或分离除初始线程之外的每个线程。

  • 正如@StephanSchlecht 在评论中观察到的那样,初始线程将项目排入

    out_q
    ,但没有其他线程将这些项目出列。初始线程仅在从
    in_q
    中出列所有项目后才尝试将它们出列。因此,如果项目多于
    out_q
    一次可以容纳的数量,那么初始线程最终将填满该队列并阻止尝试添加另一个项目。在那之后,它不会从
    in_q
    中出列任何项目,可能会导致另一个线程将该队列填满并阻塞。可能的解决方案包括:

    • 另一个线程的任务是与现有的两个同时处理
      out_q
      ,或者
    • 初始线程可以立即处理输出项,而不是将它们排队等待以后处理,或者
    • 初始线程可以为输出项使用不同的数据结构,例如没有固定容量的链表。
© www.soinside.com 2019 - 2024. All rights reserved.