C中的生产者 - 消费者w / pthreads +信号量(同时访问的缓冲区槽)

问题描述 投票:1回答:1

我用C编写了一个多线程生产者 - 消费者程序。我的初始程序不是Minimal / Complete / Verifiable,所以我从头开始重写它,而且bug仍在发生。我发现多个生产者线程正在尝试生成相同的数据(来自一系列数据),我不知道如何阻止它。

我尝试过的一些事情: - 我创建了一个与缓冲区大小相同的bool数组,以确保线程只将数据放入空槽,并从中获取数据,这解决了一个问题 - 我尝试移动输入/输出槽增量到生产者/消费者的开头,同时循环(分别),但它会导致死锁

另一个不相关的问题(明天我很可能能够自己解决,但现在就是这样)是序列中的许多值被跳过,尽管我希望它们都能被生成。

请注意:我已经学习了一个月的C,这也是我在这里发表的第一篇文章。在过去的几年里,我从这个网站学到了很多东西,我知道有时候这里的回答会非常苛刻,所以请善待那些真心想要学习和提高的人。谢谢! :)

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>

char * Buffer;
int nBufferSlots = 10;
int nThreads = 2;
pthread_t Producers, Consumers;
sem_t Empty, Full, Mutex;
int inputSlot;
int outputSlot;
char value = 0;
char SlotOccupationBoolArray[10] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};

void * producer(void *args)
{
    while(1)
    {
        sleep( rand() % 3);

        value++;

        sem_wait(&Empty);
        sem_wait(&Mutex);

        if (SlotOccupationBoolArray[inputSlot] == 0)
        {
            Buffer[inputSlot] = value;
            SlotOccupationBoolArray[inputSlot] = 1;
        }
        else
        {
            value--;
        }

        inputSlot = (inputSlot + 1) % nBufferSlots;

        sem_post(&Mutex);
        sem_post(&Full);

        printf("\n[ ");
        for (int i=0;i<10;i++)
        {
            printf("%d ", Buffer[i]);
        }
        printf("]\n");
    }
}
void * consumer(void *args)
{
    while(1)
    {
        sleep( rand() % 5);

        sem_wait(&Full);
        sem_wait(&Mutex);

        if (SlotOccupationBoolArray[outputSlot] == 1)
        {
            char ValueConsumed = Buffer[outputSlot];
            printf("\nConsumed value: %d\n", ValueConsumed);
            Buffer[outputSlot] = 0;
            SlotOccupationBoolArray[outputSlot] = 0;
        }

        outputSlot = (outputSlot + 1) % nBufferSlots;

        sem_post(&Mutex);
        sem_post(&Empty);
    }
}

int main(void)
{
    sem_open("/Mutex", O_CREAT, S_IRUSR | S_IWUSR, 1);
    sem_open("/Full", O_CREAT, S_IRUSR | S_IWUSR, 0);

    Buffer = (char*) malloc(nBufferSlots*sizeof(char));

    sem_open("/Empty", O_CREAT, S_IRUSR | S_IWUSR, nBufferSlots);

    Producers = (pthread_t) malloc(nThreads*sizeof(pthread_t));
    Consumers = (pthread_t) malloc(nThreads*sizeof(pthread_t));

    // set charBuffer elements to 0
    for (char i=0;i<10;i++)
    {
        Buffer[i] = 0;
    }

    for (int i = 0; i < nThreads; i++)
    {
        pthread_create(&Producers, NULL, producer, NULL);
        pthread_create(&Consumers, NULL, consumer, NULL);
    }

    for(int i=0;i<nThreads;i++){
        int err = pthread_create(&Producers,NULL,producer,NULL);
        if(err != 0){
            printf("Error creating producer %d\n",i+1);
        }else{
            printf("Successfully created producer %d\n",i+1);
        }
    }

    for(int i=0;i<nThreads;i++){
        int err = pthread_create(&Consumers,NULL,consumer,NULL);
        if(err != 0){
            printf("Error creating consumer %d\n",i+1);
        }else{
            printf("Successfully created consumer %d\n",i+1);
        }
    }

    pthread_exit(NULL);
}

我打印到输出的所有内容都是具有当前值的缓冲区数组(每次运行生产者线程时)以及已消耗的每个值。

我希望在输出中看到的是这样的:

  • [ 1 0 0 0 0 0 0 0 0 0 ]
  • [ 1 2 0 0 0 0 0 0 0 0 ]
  • [ 1 2 3 0 0 0 0 0 0 0 ]
  • 消费价值:1
  • [ 0 2 3 4 0 0 0 0 0 0 ]
  • [ 0 2 3 4 5 0 0 0 0 0 ]
  • 消费价值:3
  • [ 0 2 0 4 5 0 0 0 0 0 ]
  • 消费价值:4
  • [ 0 2 0 0 5 6 0 0 0 0 ]
  • [ 0 2 0 0 5 6 7 0 0 0 ]

但是,相反,我看到这样的事情(重复的2/4是问题):

  • [ 2 2 4 4 5 6 0 0 0 0 ]
  • [ 2 2 4 4 5 6 7 0 0 0 ]
  • [ 2 2 4 4 5 6 7 8 0 0 ]
  • 消费价值:2
  • 消费价值:2
  • [ 0 0 4 4 5 6 7 8 9 0 ]
  • 消费价值:4

(格式化道歉)

c pthreads buffer semaphore producer-consumer
1个回答
0
投票

值被跳过和/或重复,因为在producer()函数中,value++是在互斥量/信号量之外完成的,值的读取以及value--都是在内部完成的。

您应该记住的一件事是,线程可以通过操作系统以任意随机的方式进行调度。因此,您应该使用互斥锁/信号量或任何其他机制来保护多个线程之间共享的资源。

在您的程序中,可能会发生以下顺序。

  1. 最初值= 0。 Producer-1线程来了并做value++。现在值= 1。它得到EmptyMutex
  2. 在Producer-1将值写入缓冲区槽之前,Producer-2会被调度并且它会执行value++。现在值= 2。 Producer-2将获得Empty但它必须等待Mutex,因为它被Producer-1锁定。
  3. 现在Producer-1再次安排。从value=2开始,在这个时间点,它将相同的内容写入缓冲槽。写完后,它解锁了EmptyMutex
  4. 在此之后,Producer-2得到安排。当制作人1解锁它时,它获得Mutex。仍然value=2所以它将写入相同的缓冲槽。
  5. 请注意,如果在Producer-2写入之前安排了另一个Producer线程,则value可能仍会被更改。由于值在Mutex之外更新,因此可以更改次数/跳过次数没有限制。

那么,这个问题的解决方案是什么?

将代码更新值(或任何共享资源是通用的)移动到producer()中的互斥锁中。在您的代码中,您不必首先执行value++,如果插槽不可用,请执行value--。最佳解决方案是在写入缓冲区插槽时更新值,如下所示。

    //value++; //Don't update it here. Move it to critical section of the code, inside mutex.

    sem_wait(&Empty);
    sem_wait(&Mutex);

    if (SlotOccupationBoolArray[inputSlot] == 0)
    {
        Buffer[inputSlot] = ++value; //Update and assign value, Read about pre-increment
        SlotOccupationBoolArray[inputSlot] = 1;
    }
    //else //This section is not required.
    //{
    //    value--;
    //}

除此之外,还有一个问题。 sem_postFull应该在Producer写入缓冲区中的插槽时发生。类似地,当消费者从缓冲区中的一个槽读取时,sem_postEmpty应该发生。

但是你在每个循环中都在做sem_post。这会引起一些问题。所以也要照顾好这一点。只有在读/写发生时才对sem_post / Full执行Empty

PS:互斥和信号量是不同的概念。根据用例使用它们对于良好,有效和更易读的编程至关重要。

© www.soinside.com 2019 - 2024. All rights reserved.