我应在此处使用哪个同步原语?

问题描述 投票:0回答:1
while(1) {
     char message_buffer[SIZE];
     ssize_t message_length = mq_receive(mq_identifier, message_buffer, _mqueue_max_msg_size NULL);

     if(message_len == -1) { /* error handling... */}

     pthread_t pt1;
     int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
     if(ret) { /* error handling ... */}
}

void * handle_message (void * message) {
    puts((char *) message);
    return NULL;
}

上面的示例不是MRE,但是非常简单:

我有一个带有循环的主线程,该线程不断消耗消息队列中的消息。一旦收到新消息,它将存储在本地message_buffer缓冲区中。然后,产生一个新线程来“照顾”所述新消息,因此消息缓冲区的地址被传递到handle_message中,新线程随后执行该操作。


问题

通常,即使我可以100%地确定队列中的消息不相同,两个线程也会打印相同的消息。


我不确定,但我想我理解为什么会这样:

说我将2条不同的消息发送到队列,然后才开始使用它们。

while循环的第一次迭代中,消息将从队列中使用并保存到message_buffer。将产生一个新线程,并将message_length的地址传递给它。但是该线程可能不够快,无法将缓冲区的内容打印到流before上(在循环的下一次迭代中)使用下一条消息,并且随后覆盖message_buffer的内容。因此,第一个线程和第二个线程现在输出相同的值。


我的问题是:最有效的解决方法是什么?我对并行编程和线程/ pthreads相当陌生,并且对不同的同步原语不知所措。

Mutex麻烦

static pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;

while(1) {
     char message_buffer[SIZE];
     pthread_mutex_lock(&m);
     ssize_t message_length = mq_receive(mq_identifier, message_buffer, _mqueue_max_msg_size NULL);
     pthred_mutex_unlock(&m);

     if(message_len == -1) { /* error handling... */}

     pthread_t pt1;
     int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
     if(ret) { /* error handling ... */}
}

void * handle_message (void * message) {
    char own_buffer[SIZE];
    pthread_mutex_lock(&m);
    strncpy(own_buffer, (char *) message, SIZE);
    pthread_mutex_unlock(&m);
    puts(own_buffer);
    return NULL;
}

我认为我当前的互斥锁实现不正确,因为线程仍在接收重复的消息。主线程可以锁定互斥锁,将消息消耗到缓冲区中,解锁互斥锁,生成线程,但是该线程可能仍挂起,而主线程可以再次重写缓冲区(因为缓冲区互斥锁从未被新锁住)线程),有效地使当前的互斥体实现无效?我该如何克服?

c multithreading pthreads race-condition
1个回答
1
投票

问题是,在确保线程已使用该内存结束之前,先结束包含message_buffer的循环。

while (1) {
    char message_buffer[SIZE];
    ssize_t message_length = mq_receive(...);
    if (message_len == -1) { /* error handling */ }

    pthread_t pt1;
    int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
    if (ret) { /* error handling */ }

    /****** Can't go beyond here until thread is done with message_buffer. ******/
}

void * handle_message (void * message) {
    char own_buffer[SIZE];
    strncpy(own_buffer, (char *) message, SIZE);

    /******* Only now can the caller loop back. ******/

    puts(own_buffer);
    return NULL;
}

您可以使用信号灯或类似的信号。

static pthread_mutex_t mutex  = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t  cond   = PTHREAD_COND_INITIALIZER;
static int             copied = 0;

while (1) {
    char message_buffer[SIZE];
    ssize_t message_length = mq_receive(...);
    if (message_len == -1) { /* error handling */ }

    pthread_t pt1;
    int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
    if (ret) { /* error handling */ }

    // Wait until threads is done with message_buffer.
    pthread_mutex_lock(&mutex);
    while (!copied) pthread_cond_wait(&cond, &mutex);
    copied = 0;
    pthread_mutex_unlock(&mutex);
}

void * handle_message (void * message) {
    char own_buffer[SIZE];
    strncpy(own_buffer, (char *) message, SIZE);

    // Done with caller's buffer.
    // Signal caller to continue.
    pthread_mutex_lock(&mutex);
    copied = 1;
    pthread_cond_signal(&cond);
    pthread_mutex_unlock(&mutex);

    puts(own_buffer);
    return NULL;
}

(添加的块有效地执行了信号量操作。有关更通用的实现,请参见this answer的最后一个片段。)

但是有一个更简单的解决方案:在创建线程之前进行复制。

while (1) {
    char message_buffer[SIZE];
    ssize_t message_length = mq_receive(...);
    if (message_len == -1) { /* error handling */ }

    pthread_t pt1;
    int ret = pthread_create(&pt1, NULL, handle_message, strdup(message_buffer));
    if (ret) { /* error handling */ }
}

void * handle_message (void * message) {
    char * own_buffer = message;
    puts(own_buffer);
    free(own_buffer);
    return NULL;
}
© www.soinside.com 2019 - 2024. All rights reserved.