C Pthreads - 线程安全队列实现的问题

问题描述 投票:0回答:2

我是多线程新手,我正在尝试实现一个简单的线程安全任务队列,其中每个线程都可以从中提取工作,直到没有更多任务为止。任何线程都不会进行任务排队。

出于测试目的,每个任务仅包含一个数字。

    static pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER;

    typedef struct Task{
       int number;
    }Task;


    typedef struct Cell{
        Task t;
        struct Cell* next;
    }Cell;


    typedef struct TQueue{
        struct Cell* head;
        struct Cell* tail;
    }TQueue;



   int empty(TQueue *Queue) 
      return queue->head == queue->tail;


   void startQueue(TQueue *queue){

        queue->head = malloc(sizeof(Cell));
        queue->tail = queue->head;
   }

   void enqueue(TQueue *queue, Task C){

       queue->tail->next = malloc(sizeof(Cell));
       queue->tail = queue->tail->next;
       queue->tail->t = C;
       queue->tail->next = NULL; 
   }


    Task * dequeue(TQueue* queue){

       pthread_mutex_lock( &task_mutex);
       Task * t;

       if(empty(queue)) t = NULL;

       else{

           struct Cell* p = queue->head;
           queue->head = queue->head->next;
           t = &queue->head->t;
           free(p);
       }

       pthread_mutex_unlock( &task_mutex);
       return t;
    }

    void * work( void* arg){

       TQueue* queue = (TQueue *)arg;
       Task* t = malloc(sizeof(Task));

       for(t = dequeue(queue); t != NULL; t = dequeue(queue))
           printf("%d ", t->number);

       free(t);
       pthread_exit(NULL);
       return 0;
    }

为了一个简单的测试,我在 main 上运行了这个:

int main(){

    TQueue* queue = malloc(sizeof(TQueue));
    startQueue(queue);

    pthread_t threads[3];
    Task t[3];


    for(int i = 0; i < 3; i++){
        t[i].number = i + 1;
        enqueue(queue, t[i]);
    }

    for(int i = 0; i < 3; i++) pthread_create(&threads[i], NULL, work, (void*)queue);

    for(int i = 0; i < 3; i++) pthread_join(threads[i], NULL);

    return 0;
}

预期的输出是任意顺序的

1 2 3
,但有时它会打印一个包含奇怪数字的序列,例如
1823219 2 3
。我无法检测到任何竞争条件或相关问题,所以我感谢任何帮助。

c multithreading thread-safety pthreads threadpool
2个回答
1
投票

我发现了更多错误。

我已经注释了你的代码。我从你的第一篇文章和第二篇文章中汲取了一些内容。我已经修复了代码,显示了之前和之后的情况[请原谅无偿的风格清理]:

#include <stdio.h>
#include <pthread.h>
#include <malloc.h>

static pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER;

typedef struct Task {
    int number;
} Task;

typedef struct Cell {
// NOTE/BUG: this should be a pointer to the task. otherwise, dequeue gets
// messy
#if 0
    Task t;
#else
    Task *t;
#endif
    struct Cell *next;
} Cell;

typedef struct TQueue {
    struct Cell *head;
    struct Cell *tail;
} TQueue;

void
startQueue(TQueue *queue)
{

#if 0
    queue->head = malloc(sizeof(Cell));
#else
    queue->head = NULL;
#endif
    queue->tail = NULL;
}

int
empty(TQueue *queue)
{

    // NOTE/BUG: dequeue never touches tail, so this test is incorrect
#if 0
    return (queue->head == queue->tail);
#else
    return (queue->head == NULL);
#endif
}

void
enqueue(TQueue *queue, Task *t)
{
    Cell *p;

    pthread_mutex_lock(&task_mutex);

    p = malloc(sizeof(Cell));
    p->next = NULL;
    p->t = t;

    if (queue->tail == NULL) {
        queue->tail = p;
        queue->head = p;
    }
    else {
        queue->tail->next = p;
        queue->tail = p;
    }

    pthread_mutex_unlock(&task_mutex);
}

Task *
dequeue(TQueue *queue)
{
    Task *t;

    pthread_mutex_lock(&task_mutex);

    if (empty(queue))
        t = NULL;

    else {
        Cell *p = queue->head;

        if (p == queue->tail)
            queue->tail = NULL;

        queue->head = p->next;

        // NOTE/BUG: this is setting t to the second element in the list,
        // not the first
        // NOTE/BUG: this is also undefined behavior, in original code (with
        // original struct definition), because what t points to _does_ get
        // freed before return
#if 0
        t = &queue->head->t;
#else
        t = p->t;
#endif

        free(p);
    }

    pthread_mutex_unlock(&task_mutex);

    return t;
}

void *
work(void *arg)
{

    TQueue *queue = (TQueue *) arg;

    // NOTE/BUG: this gets orphaned on the first call to dequeue
#if 0
    Task *t = malloc(sizeof(Task));
#else
    Task *t;
#endif

    for (t = dequeue(queue); t != NULL; t = dequeue(queue))
        printf("%d ", t->number);

    // NOTE/BUG: this frees some cell allocated in main -- not what we want
#if 0
    free(t);
#endif

    pthread_exit(NULL);
    return 0;
}

// For a simple test i runned this on main:

int
main()
{

    TQueue *queue = malloc(sizeof(TQueue));

    startQueue(queue);

    pthread_t threads[3];
    Task t[3];

    for (int i = 0; i < 3; i++) {
        t[i].number = i + 1;
#if 0
        enqueue(queue, t);
#else
        enqueue(queue, &t[i]);
#endif
    }

    for (int i = 0; i < 3; i++)
        pthread_create(&threads[i], NULL, work, (void *) queue);

    for (int i = 0; i < 3; i++)
        pthread_join(threads[i], NULL);

    return 0;
}

更新:

线程是否同时执行任务?我一直在使用 htop 测试 CPU 使用率,但我只能最大限度地提高四个核心中单个核心的使用率。

需要记住的一些事情。

htop
在运行时间如此短的程序上可能不会显示太多。即使有 10,000 个队列条目,该程序也会在 20 毫秒内执行。

最好让程序自己打印信息[见下文]。请注意,

printf
stdin
执行线程锁定,因此它可能有助于程序的“串行”性质。它还对程序的执行时间贡献了显着(即
printf
dequeue
慢得多)

此外,一个线程(即第一个线程)可以独占队列并在其他线程有机会运行之前耗尽所有条目。 操作系统可以[自由]在单个内核上调度所有线程。然后它可能会稍后“迁移”它们(例如在一秒钟左右)。

我增强了程序,在输出打印中包含一些计时信息,这可能有助于显示更多您想看到的内容。此外,我还添加了命令行选项来控制线程数和排队项目数。这与我为自己的一些程序所做的类似。将程序输出转移到日志文件并检查它。在多次运行中尝试不同的选项

#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <malloc.h> #include <time.h> int opt_n; // suppress thread output int opt_T; // number of threads int opt_Q; // number of queue items static pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER; double tvzero; typedef struct Task { int number; } Task; typedef struct Cell { Task *t; struct Cell *next; } Cell; typedef struct TQueue { struct Cell *head; struct Cell *tail; } TQueue; typedef struct Thread { pthread_t tid; int xid; TQueue *queue; } Thread; double tvgetf(void) { struct timespec ts; double sec; clock_gettime(CLOCK_REALTIME,&ts); sec = ts.tv_nsec; sec /= 1e9; sec += ts.tv_sec; sec -= tvzero; return sec; } void startQueue(TQueue *queue) { queue->head = NULL; queue->tail = NULL; } int empty(TQueue *queue) { return (queue->head == NULL); } void enqueue(TQueue *queue, Task *t) { Cell *p; pthread_mutex_lock(&task_mutex); p = malloc(sizeof(Cell)); p->next = NULL; p->t = t; if (queue->tail == NULL) { queue->tail = p; queue->head = p; } else { queue->tail->next = p; queue->tail = p; } pthread_mutex_unlock(&task_mutex); } Task * dequeue(TQueue *queue) { Task *t; pthread_mutex_lock(&task_mutex); if (empty(queue)) t = NULL; else { Cell *p = queue->head; if (p == queue->tail) queue->tail = NULL; queue->head = p->next; t = p->t; free(p); } pthread_mutex_unlock(&task_mutex); return t; } void * work(void *arg) { Thread *tskcur = arg; TQueue *queue = tskcur->queue; Task *t; double tvbef; double tvaft; while (1) { tvbef = tvgetf(); t = dequeue(queue); tvaft = tvgetf(); if (t == NULL) break; if (! opt_n) printf("[%.9f/%.9f %5.5d] %d\n", tvbef,tvaft - tvbef,tskcur->xid,t->number); } return (void *) 0; } // For a simple test i runned this on main: int main(int argc,char **argv) { char *cp; TQueue *queue; Task *t; Thread *tsk; --argc; ++argv; for (; argc > 0; --argc, ++argv) { cp = *argv; if (*cp != '-') break; switch (cp[1]) { case 'n': // suppress thread output opt_n = 1; break; case 'Q': // number of queue items opt_Q = atoi(cp + 2); break; case 'T': // number of threads opt_T = atoi(cp + 2); break; default: break; } } tvzero = tvgetf(); queue = malloc(sizeof(TQueue)); startQueue(queue); if (opt_T == 0) opt_T = 16; Thread threads[opt_T]; if (opt_Q == 0) opt_Q = 10000; t = malloc(sizeof(Task) * opt_Q); for (int i = 0; i < opt_Q; i++) { t[i].number = i + 1; enqueue(queue, &t[i]); } for (int i = 0; i < opt_T; i++) { tsk = &threads[i]; tsk->xid = i + 1; tsk->queue = queue; pthread_create(&tsk->tid, NULL, work, tsk); } for (int i = 0; i < opt_T; i++) { tsk = &threads[i]; pthread_join(tsk->tid, NULL); } printf("TOTAL: %.9f\n",tvgetf()); free(t); return 0; }



更新#2:

此外,一个线程(即第一个线程)可以独占队列并在其他线程有机会运行之前耗尽所有条目。”在这种情况下可以做什么?

一些事情。

pthread_create

需要一点时间,允许线程 1 运行,而其他线程仍在创建中。改善这种情况的一种方法是创建所有线程,每个线程设置一个“我正在运行”标志(在其线程控制块中)。主线程等待所有线程设置此标志。然后,主线程设置一个全局易失性“you_may_now_all_run”标志,每个线程在进入其主线程循环之前旋转该标志。根据我的经验,它们都在彼此的微秒内开始运行[或更好]。


我没有在下面更新的代码中实现这一点,所以你可以自己尝试一下[以及

nanosleep

]。


互斥体总体上相当公平(至少在 Linux 下),因为阻塞的线程将排队等待互斥体。正如我在评论中提到的,也可以使用

nanosleep

,但这[在某种程度上]违背了目的,因为线程会减慢速度。


线程饥饿的解药是“公平”。正如我所提到的,有一个复杂的算法可以实现无需等待的公平性。这是Kogan/Petrank算法:

http://www.cs.technion.ac.il/~erez/Papers/wf-methodology-ppopp12.pdf

这确实有点复杂/高级,所以买者自负.. . 但是,妥协可能是票证锁:

https://en.wikipedia.org/wiki/Ticket_lock

我又重新修改了程序。它具有池分配、票证与互斥锁以及日志条目延迟打印等选项。它还交叉检查线程之间的结果,以确保它们没有重复的条目。

当然,这一切的关键是准确、高精度的记录(也就是说,如果你不能测量它,你就无法调整它)。

例如,人们会认为在

free

中执行

dequeue
会比简单地将 Cell 释放到可重用池(类似于平板分配器)慢,但是,性能提升并没有预期的那么好。这可能是 glibc 的
malloc/free
速度非常快 [这就是他们
声称
]。 这些不同的版本应该可以让您了解如何构建自己的性能测量套件。

无论如何,这是代码:

#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <stdatomic.h> #include <malloc.h> #include <errno.h> #include <string.h> #include <time.h> int opt_p; // print thread output immediately int opt_T; // number of threads int opt_Q; // number of queue items int opt_L; // use ticket lock int opt_M; // use fast cell alloc/free typedef unsigned char byte; typedef unsigned int u32; #define sysfault(_fmt...) \ do { \ fprintf(stderr,_fmt); \ exit(1); \ } while (0) // lock control typedef struct AnyLock { pthread_mutex_t mutex; // standard mutex volatile u32 seqreq; // ticket lock request volatile u32 seqacq; // ticket lock grant } AnyLock; // work value typedef struct Task { union { struct Task *next; int number; }; } Task; // queue item typedef struct Cell { struct Cell *next; Task *t; } Cell; // queue control typedef struct TQueue { struct Cell *head; struct Cell *tail; } TQueue; // thread log entry typedef struct Log { double tvbef; double tvaft; int number; } Log; #define BTVOFF(_off) \ ((_off) >> 3) #define BTVMSK(_off) \ (1u << ((_off) & 0x07)) #define BTVLEN(_len) \ ((_len) + 7) >> 3 // thread control typedef struct Thread { pthread_t tid; int xid; TQueue *queue; Log *log; byte *bitv; } Thread; static inline byte btvset(byte *bitv,long off) { u32 msk; byte oval; bitv += BTVOFF(off); msk = BTVMSK(off); oval = *bitv & msk; *bitv |= msk; return oval; } AnyLock task_mutex; AnyLock print_mutex; double tvzero; Cell *cellpool; // free pool of cells long bitvlen; #define BARRIER \ __asm__ __volatile__("" ::: "memory") // virtual function pointers Cell *(*cellnew)(void); void (*cellfree)(Cell *); void (*lock_acquire)(AnyLock *lock); void (*lock_release)(AnyLock *lock); double tvgetf(void) { struct timespec ts; double sec; clock_gettime(CLOCK_REALTIME,&ts); sec = ts.tv_nsec; sec /= 1e9; sec += ts.tv_sec; sec -= tvzero; return sec; } void * xalloc(size_t cnt,size_t siz) { void *ptr; ptr = calloc(cnt,siz); if (ptr == NULL) sysfault("xalloc: calloc failure -- %s\n",strerror(errno)); return ptr; } void lock_wait_ticket(AnyLock *lock,u32 newval) { u32 oldval; // wait for our ticket to come up // NOTE: atomic_load is [probably] overkill here while (1) { #if 0 oldval = atomic_load(&lock->seqacq); #else oldval = lock->seqacq; #endif if (oldval == newval) break; } } void lock_acquire_ticket(AnyLock *lock) { u32 oldval; u32 newval; int ok; // acquire our ticket value // NOTE: just use a garbage value for oldval -- the exchange will // update it with the correct/latest value -- this saves a separate // refetch within the loop oldval = 0; while (1) { #if 0 BARRIER; oldval = lock->seqreq; #endif newval = oldval + 1; ok = atomic_compare_exchange_strong(&lock->seqreq,&oldval,newval); if (ok) break; } lock_wait_ticket(lock,newval); } void lock_release_ticket(AnyLock *lock) { // NOTE: atomic_fetch_add is [probably] overkill, but leave it for now #if 1 atomic_fetch_add(&lock->seqacq,1); #else lock->seqacq += 1; #endif } void lock_acquire_mutex(AnyLock *lock) { pthread_mutex_lock(&lock->mutex); } void lock_release_mutex(AnyLock *lock) { pthread_mutex_unlock(&lock->mutex); } void lock_init(AnyLock *lock) { switch (opt_L) { case 1: lock->seqreq = 0; lock->seqacq = 1; lock_acquire = lock_acquire_ticket; lock_release = lock_release_ticket; break; default: pthread_mutex_init(&lock->mutex,NULL); lock_acquire = lock_acquire_mutex; lock_release = lock_release_mutex; break; } } void startQueue(TQueue *queue) { queue->head = NULL; queue->tail = NULL; } int empty(TQueue *queue) { return (queue->head == NULL); } // cellnew_pool -- allocate a queue entry Cell * cellnew_pool(void) { int cnt; Cell *p; Cell *pool; while (1) { // try for quick allocation p = cellpool; // bug out if we got it if (p != NULL) { cellpool = p->next; break; } // go to the heap to replenish the pool cnt = 1000; p = xalloc(cnt,sizeof(Cell)); // link up the entries pool = NULL; for (; cnt > 0; --cnt, ++p) { p->next = pool; pool = p; } // put this "online" cellpool = pool; } return p; } // cellfree_pool -- release a queue entry void cellfree_pool(Cell *p) { p->next = cellpool; cellpool = p; } // cellnew_std -- allocate a queue entry Cell * cellnew_std(void) { Cell *p; p = xalloc(1,sizeof(Cell)); return p; } // cellfree_std -- release a queue entry void cellfree_std(Cell *p) { free(p); } void enqueue(TQueue *queue, Task *t) { Cell *p; lock_acquire(&task_mutex); p = cellnew(); p->next = NULL; p->t = t; if (queue->tail == NULL) { queue->tail = p; queue->head = p; } else { queue->tail->next = p; queue->tail = p; } lock_release(&task_mutex); } Task * dequeue(TQueue *queue) { Task *t; lock_acquire(&task_mutex); if (empty(queue)) t = NULL; else { Cell *p = queue->head; if (p == queue->tail) queue->tail = NULL; queue->head = p->next; t = p->t; cellfree(p); } lock_release(&task_mutex); return t; } void * work(void *arg) { Thread *tskcur = arg; TQueue *queue = tskcur->queue; Task *t; Log *log; long cnt; int tprev; byte *bitv; double tvbeg; double tvbef; double tvaft; log = tskcur->log; bitv = tskcur->bitv; tvbeg = tvgetf(); tprev = 0; while (1) { tvbef = tvgetf(); t = dequeue(queue); tvaft = tvgetf(); if (t == NULL) break; // abort if we get a double entry if (btvset(bitv,t->number)) sysfault("work: duplicate\n"); if (opt_p) { printf("[%.9f/%.9f %5.5d] %d [%d]\n", tvbef,tvaft - tvbef,tskcur->xid,t->number,t->number - tprev); tprev = t->number; continue; } log->tvbef = tvbef; log->tvaft = tvaft; log->number = t->number; ++log; } if (! opt_p) { tvaft = tvgetf(); cnt = log - tskcur->log; log = tskcur->log; lock_acquire(&print_mutex); printf("\n"); printf("THREAD=%5.5d START=%.9f STOP=%.9f ELAP=%.9f TOTAL=%ld\n", tskcur->xid,tvbeg,tvaft,tvaft - tvbeg,cnt); tprev = 0; for (; cnt > 0; --cnt, ++log) { printf("[%.9f/%.9f %5.5d] %d [%d]\n", log->tvbef,log->tvaft - log->tvbef,tskcur->xid, log->number,log->number - tprev); tprev = log->number; } lock_release(&print_mutex); } return (void *) 0; } void btvchk(Thread *tska,Thread *tskb) { byte *btva; byte *btvb; byte aval; byte bval; int idx; printf("btvchk: %d ??? %d\n",tska->xid,tskb->xid); btva = tska->bitv; btvb = tskb->bitv; // abort if we get overlapping entries between two threads for (idx = 0; idx < bitvlen; ++idx) { aval = btva[idx]; bval = btvb[idx]; if (aval & bval) sysfault("btvchk: duplicate\n"); } } // For a simple test i runned this on main: int main(int argc,char **argv) { char *cp; TQueue *queue; Task *t; Thread *tsk; --argc; ++argv; for (; argc > 0; --argc, ++argv) { cp = *argv; if (*cp != '-') break; switch (cp[1]) { case 'p': // print immediately opt_p = 1; break; case 'Q': // number of queue items opt_Q = atoi(cp + 2); break; case 'T': // number of threads opt_T = atoi(cp + 2); break; case 'L': opt_L = 1; break; case 'M': opt_M = 1; break; default: break; } } printf("p=%d -- thread log is %s\n",opt_p,opt_p ? "immediate" : "deferred"); if (opt_T == 0) opt_T = 16; printf("T=%d (number of threads)\n",opt_T); if (opt_Q == 0) opt_Q = 1000000; printf("Q=%d (number of items to enqueue)\n",opt_Q); printf("L=%d -- lock is %s\n",opt_L,opt_L ? "ticket" : "mutex"); printf("M=%d -- queue item allocation is %s\n", opt_M,opt_M ? "pooled" : "malloc/free"); tvzero = tvgetf(); lock_init(&task_mutex); lock_init(&print_mutex); // select queue item allocation strategy switch (opt_M) { case 1: cellnew = cellnew_pool; cellfree = cellfree_pool; break; default: cellnew = cellnew_std; cellfree = cellfree_std; break; } queue = xalloc(1,sizeof(TQueue)); startQueue(queue); Thread threads[opt_T]; // get byte length of bit vectors bitvlen = BTVLEN(opt_Q + 1); // allocate per-thread log buffers for (int i = 0; i < opt_T; i++) { tsk = &threads[i]; if (! opt_p) tsk->log = xalloc(opt_Q,sizeof(Log)); tsk->bitv = xalloc(bitvlen,sizeof(byte)); } // allocate "work to do" t = xalloc(opt_Q,sizeof(Task)); // add to master queue for (int i = 0; i < opt_Q; i++) { t[i].number = i + 1; enqueue(queue, &t[i]); } // fire up the threads for (int i = 0; i < opt_T; i++) { tsk = &threads[i]; tsk->xid = i + 1; tsk->queue = queue; pthread_create(&tsk->tid, NULL, work, tsk); } // wait for threads to complete for (int i = 0; i < opt_T; i++) { tsk = &threads[i]; pthread_join(tsk->tid, NULL); } // wait for threads to complete for (int i = 0; i < opt_T; i++) { for (int j = i + 1; j < opt_T; j++) btvchk(&threads[i],&threads[j]); } printf("TOTAL: %.9f\n",tvgetf()); free(t); return 0; }



0
投票

© www.soinside.com 2019 - 2024. All rights reserved.