适当的条件变量用法

问题描述 投票:1回答:2

我想确定我理解条件变量是如何工作的,所以我将使用我写的程序来问我的问题。

在我的程序中,我有一个“生产者”线程(一个)和“工作线程”(几个让我们假设3)。

生产者线程“处理”一个FIFO链表,这意味着它所做的只是检查列表开头是否有一个项(在我的Req类型的程序请求中调用)(由一个全局指针指向)在我的程序中前面)如果是这样,将它分配到一个全局请求元素(称为globalReq)。

工作线程在一个循环中运行,等待处理请求,通过提取全局请求变量,将它们自己的局部变量(对于它们中的每一个都是“私有”)导致每个线程都有一个独立的堆栈 - 正确我,如果我错了),然后处理请求。

为了做到这一点,我使用互斥量和条件变量。

一个重要的注意事项是,一旦请求存在(暂时让我们假设只存在一个请求),那么工作线程中的哪一个将“关注”它(假设它们都是“自由的” - 睡觉并不重要在条件变量上)。

在提取请求并将其分配到全局请求之后,生产者线程调用pthread_cond_signal - 据我所知,它取消阻塞至少一个“阻塞”线程 - >因此它可以解除阻塞,例如2个线程。

所以我的问题是我现有的代码(下面):

1)我怎样才能保证只有一个线程(来自工作线程)将处理请求。我是否需要在所有通用“生产者消费者”实施中添加“while check loop”?

2)如何通过q​​azxswpoi解锁的线程(或者如果pthread_cond_broadcast解除阻塞多个线程),在互斥锁上的内容,可能我还没有把握它...

(每个)工作线程的代码是:

pthread_cond_signal

生产者线程的代码是:

void *worker(void *arg)
{

       while(1)
       {
         printf("\n BEFORE LOCKING sthread_mutex  with thread: %d \n", syscall(SYS_gettid));
                 pthread_mutex_lock(&sthread_mutex);
         printf("\n AFTER UNLOCKING sthread_mutex  with thread: %d \n", syscall(SYS_gettid));

         printf("\n BEFORE WAITING ON cond_var with thread: %d \n", syscall(SYS_gettid));           
                 pthread_cond_wait(&cond_var,&sthread_mutex); //wait on condition variable
         printf("\n AFTER WAITING ON cond_var with thread: %d \n", syscall(SYS_gettid));            

                 printf("\n got signal for thread: %d \n",syscall(SYS_gettid));

         // extract the current request into g local variable 
             // within the "private stack" of this thread   
                 Req localReq = globalReq;  
                 pthread_mutex_unlock(&sthread_mutex);
         printf("\n AFTER UNLOCKING sthread_mutex with thread: %d \n", syscall(SYS_gettid));

            // perform the desired task
                   task(localReq);
           printf("\n BEFORE calling sem_post with thread: %d \n", syscall(SYS_gettid));
           sem_post(&sem);

       } // end while (1)

} // end of worker thread function

另一个问题:

生产者线程在全局信号量上调用void *producer(void *arg) { while(1) { if(front != NULL) // queue not empty { // go to sleep if all "worker threads" are occuipied // or decrement number of free "worker threads" threads by 1 printf(" schedualer thread BEFORE calling sem_wait on sem \n"); sem_wait(&sem); // lock the sthread mutex in order to "synchronize" with the // "worker threads"... printf(" schedualer thread BEFORE locking sthread_mutex \n"); pthread_mutex_lock(&sthread_mutex); printf(" schedualer thread AFTER locking sthread_mutex \n"); globalReq = extract_element(); // this is the global request variable // notify the worker threads that an "arriving request" needed to // be taking care of printf(" schedualer thread BEFORE calling signal on cond_var \n"); // pthread_cond_signal(&cond_var); pthread_cond_broadcast(&cond_var); printf(" schedualer thread AFTER calling signal on cond_var \n"); // unlock the smutex printf(" schedualer thread BEFORE UNLOCKING sthread_mutex \n"); pthread_mutex_unlock(&sthread_mutex); printf(" schedualer thread AFTER UNLOCKING sthread_mutex \n"); } // queue not empty else continue; } // end while (1) } // end of producer (在工作线程的数量开始时初始化,在本例中为3),以便自己指示当前有多少工作线程处理请求,并完成这个“机制”,工作线程,一旦完成处理他们“赢了”的请求(当争用条件变量时),调用sem_wait来指示“另一个工作线程可用”

3)这是实现这种“发信号通知有多少可用工作线程”的正确(良好/有效)方式吗?

4)通过// *段落中提到的生产者和工人线程共享的全局变量“传递”请求有什么好处和缺点?传递它是一种明智的方式,或者最好“只是”创建一个“新的请求变量”(在使用malloc的堆上),它将为每个工作线程和请求“专用”(并且还在其中释放它)每个工作线程一旦完成为请求服务)?

5)如果您对这段代码有任何其他意见(好的或坏的),请随时向我表明。

编辑:

大家好,

一些额外的问题:

除了生产者和工作者线程之外,还有一个叫做监听器的另一个THREAD,它只能用于插入到达链表(FIFO队列)的请求,所以它实际上并不是之前提到的生产者的任务。 。

所以我的新问题是:

8)关于我的程序的附加信息,我用信号量组成的“信令机制”是否有效?

9)由生产者和监听器线程管理的链表有两个全局指针sem_postfront分别指向链表的头部和尾部(列表的头部是第一个要处理的请求)。

下面是监听器线程执行的插入函数的实现,以及生产者线程执行的“提取”功能。

为了在“队列”(链接列表)上同步这两个线程,我使用了一个名为qmutex的共享互斥锁。

我的问题是,关于下面的两个代码,在每个函数中“放置”互斥锁(锁定和解锁)的“最佳”位置在哪里?

非常感谢,

盖伊。

插入功能:

rear

提取功能:

void insertion(void *toInsert)
{

    struct getInfo *req = (struct getInfo *)toInsert;
        newNode = (N*)malloc(sizeof(N));
        newNode->req = req;
        newNode->next = NULL;

    // WHERE SHULD I LOCK (AND UNLOCK) THE  QUEUE MUTEX ????????????????????????
        if(front == NULL)
    {
        front = newNode;
        printf("empty list - insert as head \n");     
    }

        else
    {
        rear->next = newNode;
        printf(" NOT AN EMPTY list - insert as last node \n");
    }

        rear = newNode;
}  // end of insertion
c multithreading pthreads mutex condition-variable
2个回答
5
投票

而不是直接回答您的问题,首先,这里是对典型方法的描述:

您有一种队列或列表,您可以在其中添加工作数据。每当您添加一组工作数据时,首先锁定互斥锁,添加数据,发出条件变量信号,然后解锁互斥锁。

然后,您的工作线程会锁定互斥锁,并在队列为空时等待循环中的条件。发送信号时,一个或多个工作人员将被唤醒,但只有一个(一次)将获取互斥锁。锁定互斥锁后,“获胜者”会检查队列中是否存在某些内容,将其解压缩,解锁互斥锁,并执行必要的工作。在解锁互斥锁之后,其他线程也可能会被唤醒(并且如果条件被广播将会唤醒),并且将从队列中提取下一个工作,或者如果队列为空则返回等待。

在代码中,它看起来有点像这样:

Req extract_element()
{

        if(front == NULL)
            printf("\n empty queue \n");
        else
        {
                Req ret;
                tmpExtract = front;
            ret.socketNum = tmpExtract->req->socketNum;
            ret.type = tmpExtract->req->type;
                printf("\n extracting node with sockNum: %d \n",ret.socketNum);
                front = front->next;
                free(tmpExtract);
                return(ret);
        }
} // end of extract_element

(我在线程之后省略了清理工作,并且将工作人员号码传递给线程很快且很脏,但在这种情况下可以正常工作)。

在这里,工作人员将被#include <pthread.h> #include <unistd.h> #include <stdio.h> #define WORKER_COUNT 3 pthread_mutex_t mutex; pthread_cond_t cond; pthread_t workers[WORKER_COUNT]; static int queueSize = 0; static void *workerFunc(void *arg) { printf("Starting worker %d\n", (int)arg); while(1) { pthread_mutex_lock(&mutex); while(queueSize < 1) { pthread_cond_wait(&cond, &mutex); } printf("Worker %d woke up, processing queue #%d\n", (int)arg, queueSize); //Extract work from queue --queueSize; pthread_mutex_unlock(&mutex); //Do work sleep(1); } } int main() { int i; pthread_mutex_init(&mutex, 0); pthread_cond_init(&cond, 0); for(i=0; i<WORKER_COUNT; ++i) { pthread_create(&(workers[i]), 0, workerFunc, (void*)(i+1)); } sleep(1); pthread_mutex_lock(&mutex); //Add work to queue queueSize = 5; pthread_cond_broadcast(&cond); pthread_mutex_unlock(&mutex); sleep(10); return 0; } 唤醒,并且只要队列中存在某些东西就会运行(直到pthread_cond_broadcast()回到0 - 想象还有一个实际的队列),然后回到等待。

回到问题:

1:互斥锁和保护变量(这里是queueSize)负责这一点。您还需要保护变量,因为您的线程也可能因其他原因而被唤醒(所谓的虚假唤醒,请参阅queueSize)。

2:如果你调用http://linux.die.net/man/3/pthread_cond_wait,就像任何其他线程一样,唤醒线程争用互斥锁。

3:我不确定为什么你需要向生产者发出可用工作线程数量的信号?

4:队列需要可以从生产者和使用者那里访问 - 但是仍然可以用各种方式用函数(或类,如果你使用的是C ++)封装。

5:我希望以上就足够了?

6:与pthread_mutex_lock()的关系是它可以有虚假的唤醒。也就是说,即使你没有发出信号,它也可能会醒来。因此,你需要一个保护变量(在我的代码示例中围绕pthread_cond_wait()while()循环),以确保一旦pthread_cond_wait()返回,实际上有理由醒来。然后,您使用与条件使用的相同的互斥锁保护保护变量(以及您需要提取的任何工作数据),然后您可以确定只有一个线程将执行每项工作。

7:我没有让生产者进入睡眠状态,而是让它添加它可以提取到工作队列的任何数据。如果队列已满,那么它应该进入休眠状态,否则它应该继续添加内容。

8:使用Listener线程,我真的不明白为什么你甚至需要你的Producer线程。为什么不让工人自己打电话给pthread_cond_wait()

9:您需要保护对列表变量的所有访问。也就是说,在extract_element()中,在你第一次访问insertion()之前锁定互斥锁,并在你最后一次访问front之后将其解锁。在rear中也是如此 - 尽管在队列为空时你需要重写函数以获得有效的返回值。


1
投票

想要确定前一个问题和另一个问题,所以新问题是:

1)如果我仍然想“坚持”我的线程实现,意思是在我写的时候使用互斥锁和条件变量,我怎么能确定当生产者线程只调用pthread_cond_signal时,一个线程将继续运行(来自pthread_cond_wait之后的指令)? 我是否需要添加另一个检查或使用另一个变量,或者(我更喜欢)只使用pthread_cond_wait / signal的通用机制?

注意:我使用pthread_cond_broadcast来“模拟”pthread_cond_signal解除阻塞多个线程的情况。

2)澄清程序的“逻辑”:

生产者线程在分配请求时递减信号量的原因,相反,工作者线程递增它的值,就是如果所有工作线程都忙,则生产者线程在信号量上“进入睡眠状态” - >含义“等待提取和分配过程”,直到(至少)其中一个工作线程可用于处理请求。这是我想要实现的一个很好的实现,还是有更好的方法呢?

再次,非常感谢,

盖伊。

© www.soinside.com 2019 - 2024. All rights reserved.