while(1) {
char message_buffer[SIZE];
ssize_t message_length = mq_receive(mq_identifier, message_buffer, _mqueue_max_msg_size NULL);
if(message_len == -1) { /* error handling... */}
pthread_t pt1;
int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
if(ret) { /* error handling ... */}
}
void * handle_message (void * message) {
puts((char *) message);
return NULL;
}
上面的示例不是MRE,但是非常简单:
我有一个带有循环的主线程,该线程不断消耗消息队列中的消息。一旦收到新消息,它将存储在本地message_buffer
缓冲区中。然后,产生一个新线程来“照顾”所述新消息,因此消息缓冲区的地址被传递到handle_message
中,新线程随后执行该操作。
通常,即使我可以100%地确定队列中的消息不相同,两个线程也会打印相同的消息。
我不确定,但我想我理解为什么会这样:
说我将2条不同的消息发送到队列,然后才开始使用它们。
在while
循环的第一次迭代中,消息将从队列中使用并保存到message_buffer
。将产生一个新线程,并将message_length
的地址传递给它。但是该线程可能不够快,无法将缓冲区的内容打印到流before上(在循环的下一次迭代中)使用下一条消息,并且随后覆盖message_buffer
的内容。因此,第一个线程和第二个线程现在输出相同的值。
我的问题是:最有效的解决方法是什么?我对并行编程和线程/ pthreads相当陌生,并且对不同的同步原语不知所措。
static pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
while(1) {
char message_buffer[SIZE];
pthread_mutex_lock(&m);
ssize_t message_length = mq_receive(mq_identifier, message_buffer, _mqueue_max_msg_size NULL);
pthred_mutex_unlock(&m);
if(message_len == -1) { /* error handling... */}
pthread_t pt1;
int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
if(ret) { /* error handling ... */}
}
void * handle_message (void * message) {
char own_buffer[SIZE];
pthread_mutex_lock(&m);
strncpy(own_buffer, (char *) message, SIZE);
pthread_mutex_unlock(&m);
puts(own_buffer);
return NULL;
}
我认为我当前的互斥锁实现不正确,因为线程仍在接收重复的消息。主线程可以锁定互斥锁,将消息消耗到缓冲区中,解锁互斥锁,生成线程,但是该线程可能仍挂起,而主线程可以再次重写缓冲区(因为缓冲区互斥锁从未被新锁住)线程),有效地使当前的互斥体实现无效?我该如何克服?
问题是,在确保线程已使用该内存结束之前,先结束包含message_buffer
的循环。
while (1) {
char message_buffer[SIZE];
ssize_t message_length = mq_receive(...);
if (message_len == -1) { /* error handling */ }
pthread_t pt1;
int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
if (ret) { /* error handling */ }
/****** Can't go beyond here until thread is done with message_buffer. ******/
}
void * handle_message (void * message) {
char own_buffer[SIZE];
strncpy(own_buffer, (char *) message, SIZE);
/******* Only now can the caller loop back. ******/
puts(own_buffer);
return NULL;
}
您可以使用信号灯或类似的信号。
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
static int copied = 0;
while (1) {
char message_buffer[SIZE];
ssize_t message_length = mq_receive(...);
if (message_len == -1) { /* error handling */ }
pthread_t pt1;
int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
if (ret) { /* error handling */ }
// Wait until threads is done with message_buffer.
pthread_mutex_lock(&mutex);
while (!copied) pthread_cond_wait(&cond, &mutex);
copied = 0;
pthread_mutex_unlock(&mutex);
}
void * handle_message (void * message) {
char own_buffer[SIZE];
strncpy(own_buffer, (char *) message, SIZE);
// Done with caller's buffer.
// Signal caller to continue.
pthread_mutex_lock(&mutex);
copied = 1;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
puts(own_buffer);
return NULL;
}
(添加的块有效地执行了信号量操作。有关更通用的实现,请参见this answer的最后一个片段。)
但是有一个更简单的解决方案:在创建线程之前进行复制。
while (1) {
char message_buffer[SIZE];
ssize_t message_length = mq_receive(...);
if (message_len == -1) { /* error handling */ }
pthread_t pt1;
int ret = pthread_create(&pt1, NULL, handle_message, strdup(message_buffer));
if (ret) { /* error handling */ }
}
void * handle_message (void * message) {
char * own_buffer = message;
puts(own_buffer);
free(own_buffer);
return NULL;
}