我用C编写了一个多线程生产者 - 消费者程序。我的初始程序不是Minimal / Complete / Verifiable,所以我从头开始重写它,而且bug仍在发生。我发现多个生产者线程正在尝试生成相同的数据(来自一系列数据),我不知道如何阻止它。
我尝试过的一些事情: - 我创建了一个与缓冲区大小相同的bool数组,以确保线程只将数据放入空槽,并从中获取数据,这解决了一个问题 - 我尝试移动输入/输出槽增量到生产者/消费者的开头,同时循环(分别),但它会导致死锁
另一个不相关的问题(明天我很可能能够自己解决,但现在就是这样)是序列中的许多值被跳过,尽管我希望它们都能被生成。
请注意:我已经学习了一个月的C,这也是我在这里发表的第一篇文章。在过去的几年里,我从这个网站学到了很多东西,我知道有时候这里的回答会非常苛刻,所以请善待那些真心想要学习和提高的人。谢谢! :)
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
char * Buffer;
int nBufferSlots = 10;
int nThreads = 2;
pthread_t Producers, Consumers;
sem_t Empty, Full, Mutex;
int inputSlot;
int outputSlot;
char value = 0;
char SlotOccupationBoolArray[10] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
void * producer(void *args)
{
while(1)
{
sleep( rand() % 3);
value++;
sem_wait(&Empty);
sem_wait(&Mutex);
if (SlotOccupationBoolArray[inputSlot] == 0)
{
Buffer[inputSlot] = value;
SlotOccupationBoolArray[inputSlot] = 1;
}
else
{
value--;
}
inputSlot = (inputSlot + 1) % nBufferSlots;
sem_post(&Mutex);
sem_post(&Full);
printf("\n[ ");
for (int i=0;i<10;i++)
{
printf("%d ", Buffer[i]);
}
printf("]\n");
}
}
void * consumer(void *args)
{
while(1)
{
sleep( rand() % 5);
sem_wait(&Full);
sem_wait(&Mutex);
if (SlotOccupationBoolArray[outputSlot] == 1)
{
char ValueConsumed = Buffer[outputSlot];
printf("\nConsumed value: %d\n", ValueConsumed);
Buffer[outputSlot] = 0;
SlotOccupationBoolArray[outputSlot] = 0;
}
outputSlot = (outputSlot + 1) % nBufferSlots;
sem_post(&Mutex);
sem_post(&Empty);
}
}
int main(void)
{
sem_open("/Mutex", O_CREAT, S_IRUSR | S_IWUSR, 1);
sem_open("/Full", O_CREAT, S_IRUSR | S_IWUSR, 0);
Buffer = (char*) malloc(nBufferSlots*sizeof(char));
sem_open("/Empty", O_CREAT, S_IRUSR | S_IWUSR, nBufferSlots);
Producers = (pthread_t) malloc(nThreads*sizeof(pthread_t));
Consumers = (pthread_t) malloc(nThreads*sizeof(pthread_t));
// set charBuffer elements to 0
for (char i=0;i<10;i++)
{
Buffer[i] = 0;
}
for (int i = 0; i < nThreads; i++)
{
pthread_create(&Producers, NULL, producer, NULL);
pthread_create(&Consumers, NULL, consumer, NULL);
}
for(int i=0;i<nThreads;i++){
int err = pthread_create(&Producers,NULL,producer,NULL);
if(err != 0){
printf("Error creating producer %d\n",i+1);
}else{
printf("Successfully created producer %d\n",i+1);
}
}
for(int i=0;i<nThreads;i++){
int err = pthread_create(&Consumers,NULL,consumer,NULL);
if(err != 0){
printf("Error creating consumer %d\n",i+1);
}else{
printf("Successfully created consumer %d\n",i+1);
}
}
pthread_exit(NULL);
}
我打印到输出的所有内容都是具有当前值的缓冲区数组(每次运行生产者线程时)以及已消耗的每个值。
我希望在输出中看到的是这样的:
但是,相反,我看到这样的事情(重复的2/4是问题):
(格式化道歉)
值被跳过和/或重复,因为在producer()
函数中,value++
是在互斥量/信号量之外完成的,值的读取以及value--
都是在内部完成的。
您应该记住的一件事是,线程可以通过操作系统以任意随机的方式进行调度。因此,您应该使用互斥锁/信号量或任何其他机制来保护多个线程之间共享的资源。
在您的程序中,可能会发生以下顺序。
value++
。现在值= 1。它得到Empty
和Mutex
。value++
。现在值= 2。 Producer-2将获得Empty
但它必须等待Mutex
,因为它被Producer-1锁定。value=2
开始,在这个时间点,它将相同的内容写入缓冲槽。写完后,它解锁了Empty
和Mutex
。Mutex
。仍然value=2
所以它将写入相同的缓冲槽。value
可能仍会被更改。由于值在Mutex
之外更新,因此可以更改次数/跳过次数没有限制。那么,这个问题的解决方案是什么?
将代码更新值(或任何共享资源是通用的)移动到producer()
中的互斥锁中。在您的代码中,您不必首先执行value++
,如果插槽不可用,请执行value--
。最佳解决方案是在写入缓冲区插槽时更新值,如下所示。
//value++; //Don't update it here. Move it to critical section of the code, inside mutex.
sem_wait(&Empty);
sem_wait(&Mutex);
if (SlotOccupationBoolArray[inputSlot] == 0)
{
Buffer[inputSlot] = ++value; //Update and assign value, Read about pre-increment
SlotOccupationBoolArray[inputSlot] = 1;
}
//else //This section is not required.
//{
// value--;
//}
除此之外,还有一个问题。 sem_post
的Full
应该在Producer写入缓冲区中的插槽时发生。类似地,当消费者从缓冲区中的一个槽读取时,sem_post
的Empty
应该发生。
但是你在每个循环中都在做sem_post
。这会引起一些问题。所以也要照顾好这一点。只有在读/写发生时才对sem_post
/ Full
执行Empty
。
PS:互斥和信号量是不同的概念。根据用例使用它们对于良好,有效和更易读的编程至关重要。