如何以更有效的方式同步此代码?
我有这个错误代码。
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#define MAX 1000
struct TContext {
const char* Name;
int* Counter;
int Mod;
};
void* ThreadFunc(void* arg) {
struct TContext* ctxt = arg;
int* counter = ctxt->Counter;
fprintf(stderr, "This is %s thread\n", ctxt->Name);
while (*counter < MAX) {
if (*counter % 2 == ctxt->Mod) {
printf("%d ", (*counter)++);
}
}
pthread_exit(0);
}
int main()
{
pthread_t t1;
pthread_t t2;
int counter = 0;
struct TContext ctxt1 = {"even", &counter, 0};
struct TContext ctxt2 = {"odd", &counter, 1};
pthread_create(&t1, 0, ThreadFunc, &ctxt1);
pthread_create(&t2, 0, ThreadFunc, &ctxt2);
pthread_join(t1, 0);
pthread_join(t2, 0);
printf("\n");
return 0;
}
我的目标是使其同步并获得顺序0、1、2、3、4、5...。
我试图以此方式锁定和解锁互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void* ThreadFunc(void* arg) {
struct TContext* ctxt = arg;
int* counter = ctxt->Counter;
fprintf(stderr, "This is %s thread\n", ctxt->Name);
while (*counter < MAX) {
if (*counter % 2 == ctxt->Mod) {
pthread_mutex_lock(&mutex);
printf("%d ", (*counter)++);
pthread_mutex_unlock(&mutex);
}
}
pthread_exit(0);
}
但是它工作非常慢(我在一秒钟内有tl)。
如何以更有效的方式同步此代码?或者也许我可以优化C-mutex?
比Chris Halls更多的传统方法是:
pthread_cond_t cv;
pthread_mutex_t lock;
void* ThreadFunc(void* arg) {
struct TContext* ctxt = arg;
int* counter = ctxt->Counter;
fprintf(stderr, "This is %s thread\n", ctxt->Name);
pthread_mutex_lock(&lock);
while (*counter < MAX) {
if (*counter % 2 == ctxt->Mod) {
printf("%d ", (*counter)++);
pthread_cond_broadcast(&cv);
} else {
pthread_cond_wait(&cv, &lock);
}
}
pthread_mutex_unlock(&lock);
pthread_exit(0);
}
和主要内容:
pthread_mutex_init(&lock, 0);
pthread_cond_init(&cv, 0);
创建线程之前的某个位置。这也使您可以添加任意数量的偶数+奇数线程而不会产生干扰(尽管没有加速,只是出于好奇)。
我建议:
void* ThreadFunc(void* arg) {
struct TContext* ctxt = arg;
volatile int* counter = ctxt->Counter;
fprintf(stderr, "This is %s thread\n", ctxt->Name);
while (1)
{
int count ;
count = *counter ; // NB: volatile*
if (count >= MAX)
break ;
if ((count % 2) == ctxt->Mod)
{
printf("%d ", count) ;
*counter = count + 1 ;
} ;
} ;
pthread_exit(0);
}
至少对于x86 / x86_64,这会产生我认为您正在寻找的效果,即两个线程轮流递增计数器。
真正有趣的问题是为什么它起作用:-)
上面的代码严重取决于四件事:
线程之间仅共享一个值-计数器
计数器同时是数据和控制-计数器的ls位指示应该继续哪个线程。
读取和写入计数器必须是原子的-因此,每次读取计数器都会读取最后写入的值(而不是先前和当前写入的某种组合)。
编译器必须发出代码才能实际从/向内存在内存中读取/写入计数器。现在(1)和(2)特定于此特定问题。 (3)对于int
通常是正确的(尽管可能需要正确对齐)。 (4)通过将计数器定义为volatile
来实现。
所以,我最初说这至少在x86 / x86_64上有效,因为我知道(3)对于那些设备是正确的,但我也相信对于许多(大多数?)通用设备也是如此。
更清洁的实现将如下定义计数器_Atomic
:
#include <stdatomic.h>
void* ThreadFunc(void* arg) {
struct TContext* ctxt = arg;
atomic_int* counter = ctxt->Counter;
fprintf(stderr, "This is %s thread\n", ctxt->Name);
while (1)
{
int count ;
count = atomic_load_explicit(counter, memory_order_relaxed) ;
if (count > MAX) // printing up to and including MAX
break ;
if ((count % 2) == ctxt->Mod)
{
printf("%d ", count) ;
atomic_store_explicit(counter, count + 1, memory_order_relaxed) ;
} ;
} ;
pthread_exit(0);
}
哪个使(3)和(4)明确。但是请注意,(1)和(2)仍然意味着我们不需要任何内存排序。每次每个线程读取计数器时,bit0都会告诉它是否“拥有”该计数器。如果它不拥有该计数器,则线程循环以再次读取它。如果它确实拥有计数器,它将使用该值,然后写入一个新值-并且由于传递了“所有权”,它返回到读取循环(在再次“拥有”它之前,它无法对计数器做任何进一步的操作)。将MAX + 1写入计数器后,任何线程都不会使用或更改它,因此也是安全的。兄弟Employed Russian是正确的,这里有一个“数据争用”,但这可以通过数据依赖关系来解决,特别是这种情况。
一般而言
memory_order_acquire
和memory_order_acquire
原子运算来概括。假设我们有一些struct shared
,其中包含一个线程将产生而另一个线程将消耗的一些(非常重要的)数据。假设我们再次使用atomic_uint counter
(最初为零)来管理对给定struct shared parcel
的访问。现在,我们有了一个生产者线程:
void* ThreadProducerFunc(void* arg)
{
atomic_uint counter = &count ; // somehow
....
while (1)
{
uint count ;
do
count = atomic_load_explicit(counter, memory_order_acquire) ;
while ((count & 1) == 1) ;
... fill the struct shared parcel, somehow ...
atomic_store_explicit(counter, count + 1, memory_order_release) ;
} ;
....
}
以及一个消费线程:
void* ThreadConsumerFunc(void* arg) { atomic_uint counter = &count ; // somehow .... while (1) { uint count ; do count = atomic_load_explicit(counter, memory_order_acquire) ; while ((count & 1) == 0) ; ... empty the struct shared parcel, somehow ... atomic_store_explicit(counter, count + 1, memory_order_release) ; } ; .... }
加载获取操作与存储释放操作同步,因此:
之前
计数更新(新值对其他线程可见)。之前
all
atomic_load_explicit(..., memory_order_acquire)
和atomic_store_explicit(..., memory_order_release)
中的开销为零。 相反,所有读-修改-写操作(和memory_order_seq_cst操作)在几十个时钟中产生开销-30?,50 ?,如果竞争该操作则更多(取决于设备)。因此,在性能至关重要的地方,可能值得理解什么是可能的(什么不是)。
如何以更有效的方式同步此代码?
您不能:代码
从根本上
要解决此问题,您需要为每个锁定/解锁对做更多的工作。
在真实程序中,您将使每个线程为每个锁定/解锁迭代执行1000或10000个“工作项”。类似于:
lock;
const int start = *ctx->Counter;
*ctx->Counter += N;
unlock;
for (int j = start; j < start + N; j++) /* do work on j-th iteration here */;
但是您的玩具程序不适合此。或者也许我可以优化C-mutex?
我建议先尝试实现
正确
互斥锁。您会很快发现这绝非易事。如何以更有效的方式同步此代码?