我正在编写一个多线程程序,其中辅助线程在满足条件后运行,即数据结构中存在一定数量的元素。
void*
gc_thread_handler(void *arg) {
while (!done) {
pthread_mutex_lock(&(gc.mutex));
while (!gc.collect && !done) {
pthread_cond_wait(&(gc.cond), &(gc.mutex));
}
pthread_mutex_unlock(&(gc.mutex));
rcgc_activate();
}
return NULL;
}
我阻塞等待线程,直到它收到以下函数发送的信号。
void *
gc_alloc(size_t sz) {
pthread_mutex_lock(&(gc.mutex));
// ...
gc.num_chunks++;
if (gc.num_chunks > MAX_CHUNKS) {
gc.collect = true;
pthread_cond_signal(&(gc.cond));
}
pthread_mutex_unlock(&(gc.mutex));
return ptr;
}
然而,出于某种原因,信号似乎并没有立即唤醒睡眠线程;信号重复发出(因为
gc_alloc
被重复调用)。最终,等待线程确实醒来并调用gc_activate
,但我真的不明白为什么它没有立即醒来。
MRE:
gc.h:
#ifndef RCGC_H
#define RCGC_H
#define RCGC_OWALLOC(ptr, sz) (rcgc_owalloc((void **) (ptr), (sz)));
#define RCGC_ASSIGN(dest, src) (rcgc_assign((void **) (dest), (void **) (src)));
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
struct rcgc_allocation {
void *data;
struct rcgc_allocation *next;
size_t counter;
bool is_free;
};
// Reference counted gc.
struct rcgc {
struct rcgc_allocation *head;
struct rcgc_allocation *tail;
int num_chunks;
bool collect;
pthread_mutex_t mutex;
pthread_cond_t cond;
};
void rcgc_init(void);
void *rcgc_alloc(size_t sz);
void *rcgc_owalloc(void **dest, size_t sz);
void rcgc_assign(void **dest, void **src);
void rcgc_activate(void);
#endif // RCGC_H
gc.c
#include "rcgc.h"
#define MAX_CHUNKS 1000
struct rcgc gc;
static struct rcgc_allocation *rcgc_search(void *ptr);
void
rcgc_init(void) {
pthread_mutex_init(&(gc.mutex), NULL);
pthread_cond_init(&(gc.cond), NULL);
gc.head = gc.tail = NULL;
gc.num_chunks = 0;
gc.collect = false;
}
void *
rcgc_alloc(size_t sz) {
pthread_mutex_lock(&(gc.mutex));
struct rcgc_allocation *node = calloc(1, sizeof(struct rcgc_allocation));
// Add to linked list.
if (gc.head == NULL) {
gc.head = gc.tail = node;
gc.head->next = gc.tail->next = NULL;
} else {
gc.tail->next = node;
gc.tail = node;
}
// Now assign the ptr data.
void *ptr = calloc(1, sz);
node->data = ptr;
node->is_free = false;
node->counter = 1;
// If we allocate more than enough chunks, send the signal.
gc.num_chunks++;
if (gc.num_chunks > MAX_CHUNKS && !gc.collect) {
gc.collect = true;
pthread_cond_signal(&(gc.cond));
}
pthread_mutex_unlock(&(gc.mutex));
return ptr;
}
void *
rcgc_owalloc(void **dest, size_t sz) {
pthread_mutex_lock(&(gc.mutex));
// Determine where it exists in the tree.
if (dest == NULL) {
exit(EXIT_FAILURE);
} else {
struct rcgc_allocation *old_node = rcgc_search(*dest);
if (old_node != NULL) {
old_node->counter--;
}
pthread_mutex_unlock(&(gc.mutex));
return rcgc_alloc(sz);
}
}
void
rcgc_assign(void **dest, void **src) {
pthread_mutex_lock(&(gc.mutex));
// If "dest" is already in the tree, find it and decrement its ref ctr.
if (dest != NULL) {
struct rcgc_allocation *dest_alloc = rcgc_search(*dest);
if (dest_alloc != NULL) {
dest_alloc->counter--;
}
}
// If we cannot find the source, then it does not exist.
struct rcgc_allocation *src_alloc = rcgc_search(*src);
src_alloc->counter++;
*dest = *src;
pthread_mutex_unlock(&(gc.mutex));
}
void
rcgc_activate(void) {
pthread_mutex_lock(&(gc.mutex));
for (struct rcgc_allocation *curr = gc.head; curr != NULL; curr = curr->next) {
if (curr->counter == 0 && !curr->is_free) {
free(curr->data);
curr->data = NULL;
curr->is_free = true;
gc.num_chunks--;
}
}
gc.collect = false;
pthread_mutex_unlock(&(gc.mutex));
}
static struct rcgc_allocation *
rcgc_search(void *ptr) {
for (struct rcgc_allocation *curr = gc.head; curr != NULL; curr = curr->next) {
if (ptr == curr->data) {
return curr;
}
}
return NULL;
}
main.c
#include <stdio.h>
#include <stdlib.h>
#include "rcgc.h"
extern struct rcgc gc;
static bool done = false;
void*
rcgc_thread_handler(void *arg) {
while (!done) {
pthread_mutex_lock(&(gc.mutex));
while (!gc.collect && !done) {
pthread_cond_wait(&(gc.cond), &(gc.mutex));
}
pthread_mutex_unlock(&(gc.mutex));
rcgc_activate();
}
return NULL;
}
int
main(void) {
rcgc_init();
pthread_t pid;
pthread_create(&pid, NULL, rcgc_thread_handler, NULL);
// Create a few arbitrary allocations and assignments.
int *arr1 = rcgc_alloc(sizeof(int) * 10);
int *arr2 = NULL;
// Do something for a while...
double val = 0;
while (val < 1) {
val += 0.0001;
arr1 = RCGC_OWALLOC(&arr1, sizeof(int));
RCGC_ASSIGN(&arr2, &arr1);
}
done = true;
pthread_cond_broadcast(&(gc.cond));
pthread_join(pid, NULL);
return 0;
}
我不太明白的是为什么线程 A 在线程 B 有机会调用 activate() 之前一遍又一遍地重新获取锁
这是大多数工作站/服务器/移动设备操作系统默认调度策略下的正常行为。这些系统的设计目标是在多个不同的应用程序竞争资源时最大限度地利用平台的 CPU。在这种假设下,调度器最小化上下文切换的次数是有意义的。多余的上下文切换会使用 CPU 时间,否则应用程序可能会使用这些时间。
当存在对互斥体的竞争时,调度程序将倾向于将其授予时间片尚未到期的线程——已经在 CPU 上运行的线程——而不是过早地踢掉该线程(或任何其他线程) CPU,以便它可以唤醒其他一直在等待锁定的线程。
您看到的结果有一个名称。这就是所谓的“饥饿”。避免饥饿的最好方法是尽量减少对锁的争用。重新设计程序,这样任何线程在想要获取互斥量时都很少需要实际等待。确保没有线程拥有互斥量的所有权的时间超过更新一些变量所需的时间。
另一方面:如果您将平台专用于单个应用程序,那么选择替代调度策略或替代“实时”操作系统可能是有意义的,这将使您能够更好地控制线程的调度方式.