我在为特定问题创建安全高效的锁定机制时遇到困难。这是情况的一个过于简化的版本。
我有两个表,分别称为 A 和 B,每个表都包含可容纳(比如说)10 个条目的空间。在这 10 个条目中,其中 9 个是指向另一个表的指针,其余条目为 NULL。在程序启动时,非 NULL 条目以双射对应开始。例如,如果我点击从表 A 到表 B 然后再返回的链接,我会到达 A 中的原始位置。类似地,如果我点击从表 B 到表 A 然后再返回的链接,我会到达我在 A 中的原始位置。 B 中的原始位置。有全局变量用于跟踪 NULL 值的位置。
建立此双射对应关系后,程序会生成两个线程。第一个线程(“线程 A”)重复执行以下操作:随机选择 A 的一个条目,将其交换到 NULL 位置,然后继续调整 B 中的相应指针以维持双射对应关系。另一个线程(“线程 B”)执行完全相同的操作,但从另一个表开始。因此它随机选择 B 中的条目,将它们交换到 NULL 位置,并调整 A 中相应的指针。
问题:我们怎样才能在不锁定整个表的情况下使这个系统线程安全,最好只使用每个指针的底部 4 位?
显然,我关心的实际情况涉及更大的表,而且每个条目都会附加(少量)数据。而且,在现实中,运动并不是完全随机的。它们有一个目的。请随意询问您是否想了解有关我实际尝试执行的操作的更多详细信息,尽管我认为这些详细信息与解决实际的多线程问题不太相关。
附录。我刚刚注意到这个问题有多个版本。最简单的版本是,如果我们选择一个条目并意识到它无法移动,那完全没问题 - 我们只需随机选择另一个条目并移动它即可。中等难度的版本规定我们只能对 A 进行此操作。这意味着在表 B 中,我们必须阻塞并等待,直到可以移动,而不能简单地选择另一个条目。这个问题的最难版本表明我们被剥夺了简单放弃并重新随机化两个表的权利。 FWIW,我对这个问题的所有版本都感兴趣;因此,即使您只能解决“简单”版本,我仍然会重视您的答案。
附录 2. 这里有一些针对 x86-64/Linux 的示例代码,尽管没有任何线程安全机制。由于 qword 的打包方式,当前设计仅为每个指针提供 3 位,但如果需要,我们可以使用 128 位条目将其升级为 4 位。
#include <pthread.h>
#include <stdint.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
// Clear the bottom 3 bits of a 64-bit value
#define GET_POINTER(val) (uint64_t*) ((val) & ~0x4)
// Concatenates the top 61 bits of new_ptr with the bottom 3 bits of old_val
// so long as the bottom 3 bits of new_ptr are 0's
#define MODIFY_VALUE(new_ptr, old_val) ((uint64_t) (new_ptr)) ^ ((old_val) & 0x4)
// Declare globals
uint64_t A[10], B[10];
int index_of_NULL_value_A, index_of_NULL_value_B;
// Initialize tables
void init_globals() {
// Initialize A[0] and B[0] to NULL pointers
A[0] = (uint64_t) NULL; // Note that (uint64_t) NULL == 0;
B[0] = (uint64_t) NULL; // Note that (uint64_t) NULL == 0;
// Record the initial indexes of the NULL values
index_of_NULL_value_A = 0;
index_of_NULL_value_B = 0;
// Create pointers from A to B
for (int i = 1; i < 10; ++i) {
A[i] = (uint64_t) &B[i];
}
// Create pointers from B to A
for (int i = 1; i < 10; ++i) {
B[i] = (uint64_t) &A[i];
}
}
void verify_integrity_of_table(uint64_t* A, int index_of_NULL_value_A) {
// Verify the forward direction
for (int i = 0; i < 10; ++i) {
if (i == index_of_NULL_value_A) {
// Check that the NULL value is at this spot
if (A[i] != (uint64_t) NULL) {
fprintf(stderr, "Integrity check A failed! Missing NULL at i: %d\n", i);
exit(1);
}
} else {
// Check link integrity
if (&A[i] != GET_POINTER(*GET_POINTER(A[i]))) {
fprintf(stderr, "Integrity check A failed! Dodgy link at i: %d\n", i);
exit(1);
}
}
}
}
void verify_integrity() {
// Verify the forward direction
verify_integrity_of_table(A, index_of_NULL_value_A);
// Verify the backward direction
verify_integrity_of_table(B, index_of_NULL_value_B);
}
typedef void *(*start_routine_t)(void *);
pthread_t pthread_create_or_exit(start_routine_t start_routine) {
// Declare variables
pthread_t thread_id;
int result;
// Create the thread
result = pthread_create(&thread_id, NULL, start_routine, NULL);
if (result != 0) {
perror("Failed to create thread!\n");
exit(EXIT_FAILURE);
}
// Return the thread id
return thread_id;
}
void do_a_random_swap(uint64_t* A, int* addr_of_index_of_NULL_value) {
// Get the index of the NULL value
int index_of_NULL = *addr_of_index_of_NULL_value;
// Choose a random index
int i = rand() % 10;
while (i == index_of_NULL) {
i = rand() % 10;
}
// Update the backpointer
uint64_t* new_ptr = &A[index_of_NULL];
uint64_t old_val = *(GET_POINTER(A[i]));
*GET_POINTER(A[i]) = MODIFY_VALUE(new_ptr, old_val);
// Copy the item into the NULL spot and clear the old spot
A[index_of_NULL] = A[i];
A[i] = (uint64_t) NULL; // Note that (uint64_t) NULL == 0
// Update the NULL index tracker
*addr_of_index_of_NULL_value = i;
}
void* fst_start_routine(void* arg) {
// Tell the compiler not to panic about the fact that we're not using this argument
(void) arg;
// Loop forever
while (1) {
if (time(NULL) % 2 == 0) {
do_a_random_swap(A, &index_of_NULL_value_A);
} else {
sleep(0.1);
}
}
// We never get here
return NULL;
}
void* snd_start_routine(void* arg) {
// Tell the compiler not to panic about the fact that we're not using this argument
(void) arg;
// Loop forever
while (1) {
if (time(NULL) % 2 == 0) {
do_a_random_swap(B, &index_of_NULL_value_B);
} else {
sleep(0.1);
}
}
// We never get here
return NULL;
}
void* integrity_checker_start_routine(void* arg) {
// Tell the compiler not to panic about the fact that we're not using this argument
(void) arg;
// Loop forever, checking the integrity of the system during odd seconds
for (;; sleep(0.1)) {
if (time(NULL) % 2 == 1) {
verify_integrity();
}
}
// We never get here
return NULL;
}
int main() {
// Initialize random seed
srand(time(NULL));
// Initialize table and NULL-index trackers
init_globals();
// Verify integrity of the initialized values
verify_integrity();
// Spawn some threads
pthread_create_or_exit(fst_start_routine);
pthread_create_or_exit(snd_start_routine);
pthread_create_or_exit(integrity_checker_start_routine);
// Loop forever
while (1) {
sleep(1);
}
// Return successfully (we never get here)
return 0;
}
你不能。高级语言中的锁定原语不起作用。 https://jakob.engbloms.se/archives/65
如果我有以下组装功能我就可以做到:
extern uintptr_t read_ptr(table_entry *entry); /* returns entry->ptr with READ_ACQUIRE */
extern int compare_set_ptr(table_entry **target, table_entry *value, table_entry *expected); /* if *target == expected then *target = value */
perform_swap_a 看起来像这样:
struct table_entry {
table_entry *ptr;
void *attached;
}
void perform_swap_a(size_t index)
{
// Lock our side
uintptr_t selected;
do {
selected = read_ptr(A + index);
// Spin until we locked this one
} while (compare_set_ptr(&A[index].ptr, (selected & ~3) + 1, selected & ~3);
// Lock other side
struct table_entry *bptr = (table_entry *)(read_ptr(A + index) & ~3);
do {
selected = read_ptr(bptr);
// Spin until we locked this one
} while (compare_set_ptr(bptr, (selected & ~3) + 1, selected & ~3);
// Perform swap
struct table_entry newentry;
newentry = A[index];
A[index_of_NULL_value_A] = A[index]
A[index] = newentry;
size_t tmp = index;
index = index_of_NULL_value_A;
index_of_NULL_value_A = tmp;
// Unlock other side
bptr->ptr = (table_entry *)(read_ptr(bptr) & ~3);
// Unlock our side
A[index].ptr = table_entry *)(read_ptr(A + index) & ~3);
}
perform_swap_b 类似:
void perform_swap_b(size_t index)
{
// Lock other side
struct table_entry *aptr;
uintptr_t selected;
do {
// Every time we go around this loop, the pointer in index can change so we must reload.
aptr = (table_entry *)(read_ptr(B + index) & ~3);
selected = aptr == NULL ? NULL : read_ptr(aptr);
// Spin until we locked this one
} while (aptr == NULL ||
compare_set_ptr(aptr, (selected & ~3) + 1, selected & ~3);
// Lock our side
do {
selected = read_ptr(A + index);
// Spin until we locked this one
} while (compare_set_ptr(&A[index].ptr, (selected & ~3) + 1, selected & ~3);
// Perform swap
struct table_entry newentry;
newentry = B[index];
B[index_of_NULL_value_A] = B[index]
B[index] = newentry;
size_t tmp = index;
index = index_of_NULL_value_B;
index_of_NULL_value_B = tmp;
// Unlock our side
B[index]->ptr = (table_entry *)(read_ptr(bptr) & ~3);
// Unlock other side
aptr->ptr = (table_entry *)(read_ptr(aptr) & ~3);
}
整个算法只使用了指针中的 1 位,因此如果需要,可以将常量 3 更改为 1。