进程从未命名管道读取或更新共享内存中的变量时出现问题 C, linux

问题描述 投票:0回答:1

首先,感谢您的阅读和帮助!

这是一段C语言的多进程多线程代码,涉及创建多个worker进程和一个dispatcher线程,通过无名管道与worker通信。

下面是代码的解释:

结构

程序定义了两种结构类型:

  • worker_data
    有两个字段,一个整数
    id
    和一个状态字符串。
  • dispatcher_args
    带有一个字段
    dispatcher_pipe
    ,它是一个指向二维整数数组的指针。

共享内存和信号量

程序设置共享内存和信号量。共享内存用于存储有关可以被多个进程访问的工作进程的数据(

worker_data
)。信号量用于进程/线程之间的同步。

全局变量

声明了几个全局变量,包括共享内存变量、信号量变量、工作进程的计数以及用于存储工作进程的 PID 的数组。

调度函数

调度器是一个无限循环的线程函数。在每次迭代中,它检查共享内存中每个工作人员的状态。如果工人的状态是“就绪”,调度员将状态更改为“工作”,将消息写入工人的管道,然后关闭管道。

工作者函数

每个worker进程运行Worker函数。此函数使用

select()
等待调度程序在其管道上提供数据。当数据可用时,它读取数据,执行一些工作(未在提供的代码中指定),将其在共享内存中的状态更改为“就绪”,然后无限期地重复该过程。

主要功能

主要功能执行以下操作:

  1. 为工作数据分配共享内存。
  2. 取消链接并打开信号量。
  3. 初始化一个
    dispatcher_args
    结构并为其分配
    dispatcher_pipe
    的地址。
  4. 使用循环和 fork() 创建工作进程。在每次迭代中,它都会建立一个管道,fork 一个新进程,并分配子进程运行 Worker 函数。
  5. 创建一个运行
    dispatcher
    函数的调度程序线程。
  6. 等待调度线程完成。
  7. 通过取消映射共享内存、关闭和取消链接共享内存和信号量以及释放分配的内存来进行清理。

这段代码的目的是让 Dispatcher Threat 与 Worker Process 进行通信,Dispatcher 将任务分配给 Worker 进程并等待它们为新任务做好准备。每个工作进程通过更改其在共享内存中的状态来指示其准备好执行新任务。

可用于执行任务,使用共享内存、管道和信号量。

当我用 gcc updatestatu.c -o updsts -lpthread -lrt 编译程序时,程序会执行“第一个循环”,这意味着它写入和读取工作进程(例如 1)并正确更改状态那个工人(1),但在“第二个循环”中,之前执行任务的工人(1)似乎准备就绪,但在工人进程中它没有收到任何消息,只有当它转到下一个工人(a不同的人,比如 Worker 2) 它会收到消息。不知道是跟Worker Process的读取有关系,还是跟Worker Process状态的变化有关

这是我得到的输出(发送和获取消息的“循环”由 )

Worker 1 Status ready
Worker 1 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 1 Status ready

Worker 1 Status ready
Worker 1 Status working
message away
not read Worker 1 Status working

Worker 2 Status ready
Worker 2 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 2 Status ready
not read Worker 1 Status working

Worker 2 Status ready
Worker 2 Status working
message away
not read Worker 1 Status working
not read Worker 2 Status working

Worker 3 Status ready
Worker 3 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 3 Status ready
not read Worker 1 Status working
not read Worker 2 Status working

Worker 3 Status ready
Worker 3 Status working
message away
not read Worker 1 Status working
not read Worker 2 Status working
not read Worker 3 Status working

Worker 4 Status ready
Worker 4 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 4 Status ready
not read Worker 1 Status working
not read Worker 2 Status working
not read Worker 3 Status working

Worker 4 Status ready
Worker 4 Status working
message away
^C

注意,我按 ^C 是因为正如我在循环 " 中的问题中所说的那样(int i = 0; i < n_workers; i++) " 因为没有 Worker 准备好(所有 5 个 worker 应该都在工作)它会创建一个无限循环。

这是代码

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>
#include <semaphore.h>
#include <fcntl.h>
#include <sys/mman.h>

typedef struct worker_data{
    int id;
    char status[20];
} worker_data;

typedef struct dispatcher_args{
    int** dispatcher_pipe;
} dispatcher_args;

// variables for shared memory
worker_data* workers;
size_t workers_size;

// variables for semaphores
sem_t *sem_log;

int n_workers = 5;
pid_t *pid_workers;

void* dispatcher(void *args) {
    dispatcher_args* arg = args;
    int (*dispatcher_pipe)[2] = (int (*)[2])arg->dispatcher_pipe;

    char msg[100];
    strcpy(msg,"Hello!");
    int j=0;

    printf("THREAD DISPATCHER CREATED\n");

    strcpy(msg,"Hello");

    while(1){

        for (int i = 0; i < n_workers; i++) {
            if (strcmp(workers[i].status, "ready") == 0) {
                printf("\nWorker %d Status %s\n",workers[i].id,workers[i].status);

                // update status of the worker in shared memory
                strcpy(workers[i].status, "working");

                printf("Worker %d Status %s\n",workers[i].id,workers[i].status);
                close(dispatcher_pipe[i][0]); // close the read end of the pipe
                printf("message away\n");
                write(dispatcher_pipe[i][1], &msg, sizeof(msg));
                close(dispatcher_pipe[i][1]); // close the write end of the pipe
                break;
            }
            else {
                printf("not read Worker %d Status %s\n",workers[i].id,workers[i].status);
            }
        }
        //j+=1;
        sleep(2);
    }
}

void Worker(int *worker_pipe, int worker_index) {
    char msg[100];
    sprintf(msg, "WORKER %d READY\n", workers[worker_index].id);

    fd_set set;
    FD_ZERO(&set);
    FD_SET(worker_pipe[0], &set);

    while(1){


        int ready = select(worker_pipe[0]+1, &set, NULL, NULL, NULL);
        if (ready < 0) {
            perror("select");
            exit(1);
        }
        printf("message get 1\n");
        if (FD_ISSET(worker_pipe[0], &set)) {
            printf("message get 2\n");
            // data is available, read from the pipe
            ssize_t nbytes = read(worker_pipe[0], &msg, sizeof(msg));
            if (nbytes < 0) {
                perror("read");
                exit(1);
            }
            printf("Worker message received from Dispatcher: %s\n", msg);
        }
        close(worker_pipe[1]);

        // functions of worker
        // ...

        // change status of the worker in shared memory
        strcpy(workers[worker_index].status, "ready");
        printf("JOB FINISHED Worker %d Status %s\n", workers[worker_index].id, workers[worker_index].status);

        // sleep(3);
        // printf("I am Worker and Im alive\n");
    }
}

int main() {
    pthread_t dispatcher_thread;
    int dispatcher_pipe[n_workers][2];
    workers_size = n_workers * sizeof(worker_data);
    int workers_fd = shm_open("workers", O_CREAT | O_RDWR, 0666);
    ftruncate(workers_fd, workers_size);
    workers = mmap(NULL, workers_size, PROT_READ | PROT_WRITE, MAP_SHARED, workers_fd, 0);

    pid_workers = malloc(n_workers * sizeof(pid_t));

// create semaphores
    sem_unlink("sem_log");
    sem_log = sem_open("sem_log", O_CREAT | O_EXCL, 0700, 1);

    dispatcher_args* disp_args = malloc(sizeof(dispatcher_args));
    disp_args->dispatcher_pipe= (int **) dispatcher_pipe;

// creating worker processes
    for (int i = 0; i < n_workers; i++) {
        worker_data worker;
        worker.id = i+1;
        strcpy(worker.status, "ready");
        workers[i] = worker;

        // create the unnamed pipe for the dispatcher and corresponding association
        if (pipe(dispatcher_pipe[i]) == -1) {
            printf("Error creating dispatcher pipe n1\n");
            exit(0);
        }

        pid_workers[i] = fork();

        if (pid_workers[i] == 0) {
            // worker process
            Worker(dispatcher_pipe[i], i);
            // printf("Hey\n");
        } else if (pid_workers[i] > 0) {
            // parent process
        } else {
            // error forking
            perror("fork");
            exit(EXIT_FAILURE);
        }
    }

// create dispatcher thread
    pthread_create(&dispatcher_thread, NULL, dispatcher, disp_args);

// wait for the threads to finish
    pthread_join(dispatcher_thread, NULL);

// clean up
    munmap(workers, workers_size);
    close(workers_fd);
    shm_unlink("workers");
    sem_close(sem_log);
    sem_unlink("sem_log");
    free(pid_workers);
    free(workers);
    free(disp_args);

    return 0;

}
c process pipe pthreads shared-memory
1个回答
0
投票

不要关闭管道!

  1. 在主进程/线程中,发送第一条消息后,关闭管道的发送端。所以,你可以 never 发送另一条消息。
  2. worker进程中,收到第一条消息后,关闭管道的接收端。所以,你可以 never 收到另一条消息。
© www.soinside.com 2019 - 2024. All rights reserved.