首先,感谢您的阅读和帮助!
这是一段C语言的多进程多线程代码,涉及创建多个worker进程和一个dispatcher线程,通过无名管道与worker通信。
下面是代码的解释:
程序定义了两种结构类型:
worker_data
有两个字段,一个整数 id
和一个状态字符串。dispatcher_args
带有一个字段 dispatcher_pipe
,它是一个指向二维整数数组的指针。程序设置共享内存和信号量。共享内存用于存储有关可以被多个进程访问的工作进程的数据(
worker_data
)。信号量用于进程/线程之间的同步。
声明了几个全局变量,包括共享内存变量、信号量变量、工作进程的计数以及用于存储工作进程的 PID 的数组。
调度器是一个无限循环的线程函数。在每次迭代中,它检查共享内存中每个工作人员的状态。如果工人的状态是“就绪”,调度员将状态更改为“工作”,将消息写入工人的管道,然后关闭管道。
每个worker进程运行Worker函数。此函数使用
select()
等待调度程序在其管道上提供数据。当数据可用时,它读取数据,执行一些工作(未在提供的代码中指定),将其在共享内存中的状态更改为“就绪”,然后无限期地重复该过程。
主要功能执行以下操作:
dispatcher_args
结构并为其分配dispatcher_pipe
的地址。dispatcher
函数的调度程序线程。这段代码的目的是让 Dispatcher Threat 与 Worker Process 进行通信,Dispatcher 将任务分配给 Worker 进程并等待它们为新任务做好准备。每个工作进程通过更改其在共享内存中的状态来指示其准备好执行新任务。
可用于执行任务,使用共享内存、管道和信号量。
当我用 gcc updatestatu.c -o updsts -lpthread -lrt 编译程序时,程序会执行“第一个循环”,这意味着它写入和读取工作进程(例如 1)并正确更改状态那个工人(1),但在“第二个循环”中,之前执行任务的工人(1)似乎准备就绪,但在工人进程中它没有收到任何消息,只有当它转到下一个工人(a不同的人,比如 Worker 2) 它会收到消息。不知道是跟Worker Process的读取有关系,还是跟Worker Process状态的变化有关
这是我得到的输出(发送和获取消息的“循环”由 )
Worker 1 Status ready
Worker 1 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 1 Status ready
Worker 1 Status ready
Worker 1 Status working
message away
not read Worker 1 Status working
Worker 2 Status ready
Worker 2 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 2 Status ready
not read Worker 1 Status working
Worker 2 Status ready
Worker 2 Status working
message away
not read Worker 1 Status working
not read Worker 2 Status working
Worker 3 Status ready
Worker 3 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 3 Status ready
not read Worker 1 Status working
not read Worker 2 Status working
Worker 3 Status ready
Worker 3 Status working
message away
not read Worker 1 Status working
not read Worker 2 Status working
not read Worker 3 Status working
Worker 4 Status ready
Worker 4 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 4 Status ready
not read Worker 1 Status working
not read Worker 2 Status working
not read Worker 3 Status working
Worker 4 Status ready
Worker 4 Status working
message away
^C
注意,我按 ^C 是因为正如我在循环 " 中的问题中所说的那样(int i = 0; i < n_workers; i++) " 因为没有 Worker 准备好(所有 5 个 worker 应该都在工作)它会创建一个无限循环。
这是代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>
#include <semaphore.h>
#include <fcntl.h>
#include <sys/mman.h>
typedef struct worker_data{
int id;
char status[20];
} worker_data;
typedef struct dispatcher_args{
int** dispatcher_pipe;
} dispatcher_args;
// variables for shared memory
worker_data* workers;
size_t workers_size;
// variables for semaphores
sem_t *sem_log;
int n_workers = 5;
pid_t *pid_workers;
void* dispatcher(void *args) {
dispatcher_args* arg = args;
int (*dispatcher_pipe)[2] = (int (*)[2])arg->dispatcher_pipe;
char msg[100];
strcpy(msg,"Hello!");
int j=0;
printf("THREAD DISPATCHER CREATED\n");
strcpy(msg,"Hello");
while(1){
for (int i = 0; i < n_workers; i++) {
if (strcmp(workers[i].status, "ready") == 0) {
printf("\nWorker %d Status %s\n",workers[i].id,workers[i].status);
// update status of the worker in shared memory
strcpy(workers[i].status, "working");
printf("Worker %d Status %s\n",workers[i].id,workers[i].status);
close(dispatcher_pipe[i][0]); // close the read end of the pipe
printf("message away\n");
write(dispatcher_pipe[i][1], &msg, sizeof(msg));
close(dispatcher_pipe[i][1]); // close the write end of the pipe
break;
}
else {
printf("not read Worker %d Status %s\n",workers[i].id,workers[i].status);
}
}
//j+=1;
sleep(2);
}
}
void Worker(int *worker_pipe, int worker_index) {
char msg[100];
sprintf(msg, "WORKER %d READY\n", workers[worker_index].id);
fd_set set;
FD_ZERO(&set);
FD_SET(worker_pipe[0], &set);
while(1){
int ready = select(worker_pipe[0]+1, &set, NULL, NULL, NULL);
if (ready < 0) {
perror("select");
exit(1);
}
printf("message get 1\n");
if (FD_ISSET(worker_pipe[0], &set)) {
printf("message get 2\n");
// data is available, read from the pipe
ssize_t nbytes = read(worker_pipe[0], &msg, sizeof(msg));
if (nbytes < 0) {
perror("read");
exit(1);
}
printf("Worker message received from Dispatcher: %s\n", msg);
}
close(worker_pipe[1]);
// functions of worker
// ...
// change status of the worker in shared memory
strcpy(workers[worker_index].status, "ready");
printf("JOB FINISHED Worker %d Status %s\n", workers[worker_index].id, workers[worker_index].status);
// sleep(3);
// printf("I am Worker and Im alive\n");
}
}
int main() {
pthread_t dispatcher_thread;
int dispatcher_pipe[n_workers][2];
workers_size = n_workers * sizeof(worker_data);
int workers_fd = shm_open("workers", O_CREAT | O_RDWR, 0666);
ftruncate(workers_fd, workers_size);
workers = mmap(NULL, workers_size, PROT_READ | PROT_WRITE, MAP_SHARED, workers_fd, 0);
pid_workers = malloc(n_workers * sizeof(pid_t));
// create semaphores
sem_unlink("sem_log");
sem_log = sem_open("sem_log", O_CREAT | O_EXCL, 0700, 1);
dispatcher_args* disp_args = malloc(sizeof(dispatcher_args));
disp_args->dispatcher_pipe= (int **) dispatcher_pipe;
// creating worker processes
for (int i = 0; i < n_workers; i++) {
worker_data worker;
worker.id = i+1;
strcpy(worker.status, "ready");
workers[i] = worker;
// create the unnamed pipe for the dispatcher and corresponding association
if (pipe(dispatcher_pipe[i]) == -1) {
printf("Error creating dispatcher pipe n1\n");
exit(0);
}
pid_workers[i] = fork();
if (pid_workers[i] == 0) {
// worker process
Worker(dispatcher_pipe[i], i);
// printf("Hey\n");
} else if (pid_workers[i] > 0) {
// parent process
} else {
// error forking
perror("fork");
exit(EXIT_FAILURE);
}
}
// create dispatcher thread
pthread_create(&dispatcher_thread, NULL, dispatcher, disp_args);
// wait for the threads to finish
pthread_join(dispatcher_thread, NULL);
// clean up
munmap(workers, workers_size);
close(workers_fd);
shm_unlink("workers");
sem_close(sem_log);
sem_unlink("sem_log");
free(pid_workers);
free(workers);
free(disp_args);
return 0;
}
不要关闭管道!