如何加快C互斥锁?

问题描述 投票:-1回答:3

我有这个错误代码。

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

#define MAX 1000

struct TContext {
    const char* Name;
    int* Counter;
    int Mod;
};

void* ThreadFunc(void* arg) {
    struct TContext* ctxt = arg;
    int* counter = ctxt->Counter;
    fprintf(stderr, "This is %s thread\n", ctxt->Name);
    while (*counter < MAX) {
        if (*counter % 2 == ctxt->Mod) {
            printf("%d ", (*counter)++);
        }
    }
    pthread_exit(0);
}

int main()
{
    pthread_t t1;
    pthread_t t2;

    int counter = 0;
    struct TContext ctxt1 = {"even", &counter, 0};
    struct TContext ctxt2 = {"odd", &counter, 1};
    pthread_create(&t1, 0, ThreadFunc, &ctxt1);
    pthread_create(&t2, 0, ThreadFunc, &ctxt2);

    pthread_join(t1, 0);
    pthread_join(t2, 0);
    printf("\n");
    return 0;
}

我的目标是使其同步并获得顺序0、1、2、3、4、5...。

我试图以此方式锁定和解锁互斥锁

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void* ThreadFunc(void* arg) {
  struct TContext* ctxt = arg;
  int* counter = ctxt->Counter;
  fprintf(stderr, "This is %s thread\n", ctxt->Name);
  while (*counter < MAX) {
    if (*counter % 2 == ctxt->Mod) {
      pthread_mutex_lock(&mutex);
      printf("%d ", (*counter)++);
      pthread_mutex_unlock(&mutex);
    }
  }
  pthread_exit(0);
}

但是它工作非常慢(我在一秒钟内有tl)。

如何以更有效的方式同步此代码?或者也许我可以优化C-mutex?

c pthreads mutex
3个回答
1
投票

Chris Halls更多的传统方法是:

pthread_cond_t  cv;
pthread_mutex_t lock;

void* ThreadFunc(void* arg) {
    struct TContext* ctxt = arg;
    int* counter = ctxt->Counter;
    fprintf(stderr, "This is %s thread\n", ctxt->Name);
    pthread_mutex_lock(&lock);
    while (*counter < MAX) {
        if (*counter % 2 == ctxt->Mod) {
            printf("%d ", (*counter)++);
            pthread_cond_broadcast(&cv);
        } else {
            pthread_cond_wait(&cv, &lock);
        }
    }
    pthread_mutex_unlock(&lock);
    pthread_exit(0);
}

和主要内容:

pthread_mutex_init(&lock, 0);
pthread_cond_init(&cv, 0);

创建线程之前的某个位置。这也使您可以添加任意数量的偶​​数+奇数线程而不会产生干扰(尽管没有加速,只是出于好奇)。


1
投票

我建议:

void* ThreadFunc(void* arg) {
    struct TContext* ctxt = arg;
    volatile int* counter = ctxt->Counter;
    fprintf(stderr, "This is %s thread\n", ctxt->Name);

    while (1)
    {
        int count ;

        count = *counter ;     // NB: volatile*

        if (count >= MAX)
          break ;

         if ((count % 2) == ctxt->Mod)
         {
             printf("%d ", count) ;
             *counter = count + 1 ;
         } ;
    } ;
    pthread_exit(0);
}

至少对于x86 / x86_64,这会产生我认为您正在寻找的效果,即两个线程轮流递增计数器。

真正有趣的问题是为什么它起作用:-)


后记

上面的代码严重取决于四件事:

  1. 线程之间仅共享一个值-计数器

  2. 计数器同时是数据控制-计数器的ls位指示应该继续哪个线程。

  3. 读取和写入计数器必须是原子的-因此,每次读取计数器都会读取最后写入的值(而不是先前和当前写入的某种组合)。

  4. 编译器必须发出代码才能实际从/向内存在内存中读取/写入计数器。现在(1)和(2)特定于此特定问题。 (3)对于int通常是正确的(尽管可能需要正确对齐)。 (4)通过将计数器定义为volatile来实现。

    所以,我最初说这至少在x86 / x86_64上有效,因为我知道(3)对于那些设备是正确的,但我也相信对于许多(大多数?)通用设备也是如此。

  5. 更清洁的实现将如下定义计数器_Atomic

#include <stdatomic.h> void* ThreadFunc(void* arg) { struct TContext* ctxt = arg; atomic_int* counter = ctxt->Counter; fprintf(stderr, "This is %s thread\n", ctxt->Name); while (1) { int count ; count = atomic_load_explicit(counter, memory_order_relaxed) ; if (count > MAX) // printing up to and including MAX break ; if ((count % 2) == ctxt->Mod) { printf("%d ", count) ; atomic_store_explicit(counter, count + 1, memory_order_relaxed) ; } ; } ; pthread_exit(0); }

哪个使(3)和(4)明确。但是请注意,(1)和(2)仍然意味着我们不需要任何内存排序。每次每个线程读取计数器时,bit0都会告诉它是否“拥有”该计数器。如果它不拥有该计数器,则线程循环以再次读取它。如果它确实拥有计数器,它将使用该值,然后写入一个新值-并且由于传递了“所有权”,它返回到读取循环(在再次“拥有”它之前,它无法对计数器做任何进一步的操作)。将MAX + 1写入计数器后,任何线程都不会使用或更改它,因此也是安全的。

兄弟Employed Russian是正确的,这里有一个“数据争用”,但这可以通过数据依赖关系来解决,特别是这种情况。


一般而言

上面的代码不是非常有用,除非您具有其他具有单个共享值的应用程序。但这可以使用memory_order_acquirememory_order_acquire原子运算来概括。

假设我们有一些struct shared,其中包含一个线程将产生而另一个线程将消耗的一些(非常重要的)数据。假设我们再次使用atomic_uint counter(最初为零)来管理对给定struct shared parcel的访问。现在,我们有了一个生产者线程:

void* ThreadProducerFunc(void* arg) { atomic_uint counter = &count ; // somehow .... while (1) { uint count ; do count = atomic_load_explicit(counter, memory_order_acquire) ; while ((count & 1) == 1) ; ... fill the struct shared parcel, somehow ... atomic_store_explicit(counter, count + 1, memory_order_release) ; } ; .... }

以及一个消费线程:

void* ThreadConsumerFunc(void* arg) { atomic_uint counter = &count ; // somehow .... while (1) { uint count ; do count = atomic_load_explicit(counter, memory_order_acquire) ; while ((count & 1) == 0) ; ... empty the struct shared parcel, somehow ... atomic_store_explicit(counter, count + 1, memory_order_release) ; } ; .... }

加载获取操作与存储释放操作同步,因此:

  • 在生产者中:直到生产者拥有“所有权”(如上所述),包裹才会开始填充,然后将“完成”(写入对其他线程可见)

    之前

    计数更新(新值对其他线程可见)。
  • 在使用者中:直到使用者拥有“所有权”(如上所述)之后,才开始清空包裹,然后将“完成”(所有读取都已从内存中读取)

    之前

  • 计数更新(新值对其他线程可见)。显然,两个线程正忙于彼此等待。但是,如果有两个或多个包裹和柜台,线程可以以较慢的速度前进。


    最后-x86 / x86_64并获取/发布

    对于x86 / x86_64,

    all

内存读取和写入是隐式的获取读取和释放写入。这意味着atomic_load_explicit(..., memory_order_acquire)atomic_store_explicit(..., memory_order_release)中的开销为零。 相反,所有读-修改-写操作(和memory_order_seq_cst操作)在几十个时钟中产生开销-30?,50 ?,如果竞争该操作则更多(取决于设备)。

因此,在性能至关重要的地方,可能值得理解什么是可能的(什么不是)。

如何以更有效的方式同步此代码?

您不能:代码

从根本上

低效。问题是,与同步开销相比,您所做的工作量(递增整数)微不足道,因此后者占主导地位。

要解决此问题,您需要为每个锁定/解锁对做更多的工作。

在真实程序中,您将使每个线程为每个锁定/解锁迭代执行1000或10000个“工作项”。类似于:

lock; const int start = *ctx->Counter; *ctx->Counter += N; unlock; for (int j = start; j < start + N; j++) /* do work on j-th iteration here */;

但是您的玩具程序不适合此。

或者也许我可以优化C-mutex?

我建议先尝试实现

正确

互斥锁。您会很快发现这绝非易事。

0
投票
如何以更有效的方式同步此代码?
© www.soinside.com 2019 - 2024. All rights reserved.