使用 PThreads 的 C 生产者-消费者

问题描述 投票:0回答:1

我正在解决一个问题,我正在实现一个模仿生产者-消费者范式的程序。当我只有一个生产者和一个消费者时,我使用的代码可以工作,但当我添加另一个生产者和另一个消费者时,它就不起作用。

我已经花了一段时间在这上面,似乎无法弄清楚为什么我会收到错误

Synchronization Error: Producer x Just overwrote x from Slot x
。我通过各种测试跟踪了这个问题,问题在于,当一个生产者注意到另一个生产者位于其临界区时,它并没有被阻止。

#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdlib.h>

void *producer (void *) ;
void *consumer(void *) ;
sem_t empty, full, mutex ;

int buffer[10] /*note extra long space!*/ ;
int ID[10] ;
int in = 0 ; int out = 0 ;
int BUFFER_SIZE = 10 ;
int nextProduced = 0 ;

main() {
    int i ;
    pthread_t TID[10] ;

    sem_init(&empty, 0, 10) ;
    sem_init(&full, 0, 0) ;
    sem_init(&mutex, 0, 1) ;

    for(i = 0; i < 10; i++) {
        ID[i] = i ;
        buffer[i] = -1 ;
    }

    //for(i = 0; i < 5000; i += 2) {
        pthread_create(&TID[0], NULL, producer, (void *) &ID[0]) ;
        printf("Producer ID = %d created!\n", 0) ;
        pthread_create(&TID[1], NULL, consumer, (void *) &ID[1]) ;
        printf("Consumer ID = %d created!\n", 1) ;

        pthread_create(&TID[2], NULL, producer, (void *) &ID[2]) ;
        printf("Producer ID = %d created!\n", 2) ;
        pthread_create(&TID[3], NULL, consumer, (void *) &ID[3]) ;
        printf("Consumer ID = %d created!\n", 3) ;
    //}

    for(i = 0; i < 10 ; i++) {
        pthread_join(TID[i], NULL) ;
    }
}

void *producer(void *Boo) {
    int *ptr;
    int ID;
    ptr = (int *) Boo;
    ID = *ptr;
    while (1) {
        nextProduced++; //Producing Integers
        /* Check to see if Overwriting unread slot */
        sem_wait(&empty);
        sem_wait(&mutex);

        if (buffer[in] != -1) {
            printf("Synchronization Error: Producer %d Just overwrote %d from Slot %d\n", ID, buffer[in], in);
            exit(0);
        }

        /* Looks like we are OK */
        buffer[in] = nextProduced;
        printf("Producer %d. Put %d in slot %d\n", ID, nextProduced, in);
        in = (in + 1) % BUFFER_SIZE;
        printf("incremented in!\n");

        sem_post(&mutex);
        sem_post(&full);
    }
 }

void *consumer (void *Boo) {
    static int nextConsumed = 0 ;
    int *ptr ;
    int ID ;
    ptr = (int *) Boo ;
    ID = *ptr ;
    while (1) {
        sem_wait(&full);
        sem_wait(&mutex);

        nextConsumed = buffer[out];
        /*Check to make sure we did not read from an empty slot*/
        if (nextConsumed == -1) {
            printf("Synch Error: Consumer %d Just Read from empty slot %d\n", ID, out) ;
            exit(0) ;
        }
        /* We must be OK */
        printf("Consumer %d Just consumed item %d from slot %d\n", ID, nextConsumed, out) ;
        buffer[out] = -1 ;
        out = (out + 1) % BUFFER_SIZE;

        sem_post(&mutex);
        sem_post(&empty);
    }
}

输出:

Producer ID = 0 created!
Producer 0. Put 1 in slot 0
Consumer ID = 1 created!
incremented in!
Consumer 1 Just consumed item 1 from slot 0
Producer ID = 2 created!
Producer 0. Put 2 in slot 1
Synchronization Error: Producer 2 Just overwrote 2 from Slot 1
Consumer 1 Just consumed item 2 from slot 1
Consumer ID = 3 created!
incremented in!
Consumer 3 Just consumed item 2 from slot 1
Synch Error: Consumer 1 Just Read from empty slot 2
Producer 0. Put 4 in slot 2

如您所见,生产者 0 设法将 2 放入槽 1。但是,在生产者 0 可以递增

in
之前,生产者 2 尝试将数据写入槽 1,因为
in
未递增。

由于某种原因,我的

sem_waits()
似乎不起作用。有人可以帮我吗?

c multithreading pthreads semaphore producer-consumer
1个回答
0
投票

我重新设计了你的代码以在我的系统上运行,它似乎工作正常。

具体更改:由于我使用的是 OSX,所以我需要从

sem_init()
更改为
sem_open()
sem_unlink()
;为了适应这一变化以及一般的代码,我添加了一个中断处理程序,以便输入 ^C 将使消费者和生产者完成并允许
pthread_join()
以及任何后续清理代码运行;你将进程的数量与看起来独立的缓冲区的数量联系起来(参见你的 ID 和缓冲区初始化代码)——我将两者分开;将
nextProduced++
移动到互斥锁内;各种随机风格调整:

#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdlib.h>
#include <signal.h>

#define MAX_THREADS 5
#define BUFFER_SIZE 10

sem_t *empty, *full, *mutex;

int buffer[BUFFER_SIZE];
int in = 0, out = 0;

static volatile int keepRunning = 1;

void intHandler(int dummy) {
    keepRunning = 0;
}

void *producer(void * id_ptr) {
    int ID = *((int *) id_ptr);
    static int nextProduced = 0;

    while (keepRunning) {

        (void) sem_wait(empty);
        (void) sem_wait(mutex);

       /* Check to see if Overwriting unread slot */
        if (buffer[in] != -1) {
            fprintf(stderr, "Synchronization Error: Producer %d Just overwrote %d from Slot %d\n", ID, buffer[in], in);
            exit(1);
        }

        nextProduced++; // Producing Integers

        /* Looks like we are OK */
        buffer[in] = nextProduced;
        printf("Producer %d. Put %d in slot %d\n", ID, nextProduced, in);
        in = (in + 1) % BUFFER_SIZE;
        printf("incremented in!\n");

        (void) sem_post(mutex);
        (void) sem_post(full);
    }

    return NULL;
}

void *consumer (void *id_ptr) {
    int ID = *((int *) id_ptr);
    static int nextConsumed = 0;

    while (keepRunning) {

        (void) sem_wait(full);
        (void) sem_wait(mutex);

        nextConsumed = buffer[out];

        /* Check to make sure we did not read from an empty slot */
        if (nextConsumed == -1) {
            fprintf(stderr, "Synch Error: Consumer %d Just Read from empty slot %d\n", ID, out);
            exit(1);
        }

        /* We must be OK */
        printf("Consumer %d Just consumed item %d from slot %d\n", ID, nextConsumed, out);
        buffer[out] = -1;
        out = (out + 1) % BUFFER_SIZE;
        printf("incremented out!\n");

        (void) sem_post(mutex);
        (void) sem_post(empty);
    }

    return NULL;
}

int main() {
    int ID[MAX_THREADS];
    pthread_t TID[MAX_THREADS];

    empty = sem_open("/empty", O_CREAT, 0644, 10);
    full = sem_open("/full", O_CREAT, 0644, 0);
    mutex = sem_open("/mutex", O_CREAT, 0644, 1);

    signal(SIGINT, intHandler);

    for (int i = 0; i < MAX_THREADS; i++) {
        ID[i] = i;
    }

    for (int i = 0; i < BUFFER_SIZE; i++) {
        buffer[i] = -1;
    }

    pthread_create(&TID[0], NULL, producer, (void *) &ID[0]);
    printf("Producer ID = %d created!\n", 0);
    pthread_create(&TID[1], NULL, consumer, (void *) &ID[1]);
    printf("Consumer ID = %d created!\n", 1);

    pthread_create(&TID[2], NULL, producer, (void *) &ID[2]);
    printf("Producer ID = %d created!\n", 2);
    pthread_create(&TID[3], NULL, consumer, (void *) &ID[3]);
    printf("Consumer ID = %d created!\n", 3);

    for (int i = 0; i < 4; i++) {
        pthread_join(TID[i], NULL);
    }

    (void) sem_unlink("/empty");
    (void) sem_unlink("/full");
    (void) sem_unlink("/mutex");

    return 0;
}

输出

> ./a.out
Producer ID = 0 created!
Producer 0. Put 1 in slot 0
Consumer ID = 1 created!
incremented in!
Producer ID = 2 created!
Producer 0. Put 2 in slot 1
incremented in!
Producer 2. Put 3 in slot 2
incremented in!
Consumer ID = 3 created!
Producer 0. Put 4 in slot 3
incremented in!
Consumer 1 Just consumed item 1 from slot 0
incremented out!
Consumer 3 Just consumed item 2 from slot 1
incremented out!
Producer 2. Put 5 in slot 4
incremented in!
Producer 0. Put 6 in slot 5
incremented in!
Consumer 1 Just consumed item 3 from slot 2
incremented out!
Consumer 3 Just consumed item 4 from slot 3
incremented out!
Producer 2. Put 7 in slot 6
incremented in!
Producer 0. Put 8 in slot 7
incremented in!
Consumer 1 Just consumed item 5 from slot 4
incremented out!
Consumer 3 Just consumed item 6 from slot 5
...
© www.soinside.com 2019 - 2024. All rights reserved.