我尝试使用具有无锁队列的多线程来获得最佳性能来计算素数,而无需编辑朴素的素数检查函数,并且最多仅需要1.8MB的RAM空间。
第一次尝试时,我做了多线程,但内存有问题,所以我改用链表来减少内存,在我读到无锁队列之后。
但我坚持尝试提高性能。
在代码中,我得到了大量的数字,我需要计算来自标准输入的素数。
我的问题是:如何缩短达到最佳性能的时间?
我的尝试:
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <pthread.h>
#include <stdatomic.h>
#define NUM_THREADS 8
// Struct for a node in the linked list
typedef struct Node {
int data;
struct Node* next;
} Node;
// Struct for a lock-free queue
typedef struct {
_Atomic(Node*) head;
_Atomic(Node*) tail;
} LockFreeQueue;
// Struct to hold arguments for each thread
typedef struct {
LockFreeQueue* queue;
int* counter;
} ThreadArgs;
// Function to check if a number is prime
bool isPrime(int n) {
if (n <= 1) {
return false;
}
for (int i = 2; i * i <= n; i++) {
if (n % i == 0) {
return false;
}
}
return true;
}
// Function to create a new node
Node* createNode(int data) {
Node* newNode = (Node*)malloc(sizeof(Node));
if (newNode == NULL) {
fprintf(stderr, "Memory allocation failed\n");
exit(1);
}
newNode->data = data;
newNode->next = NULL;
return newNode;
}
// Function to initialize a lock-free queue
LockFreeQueue* createLockFreeQueue() {
LockFreeQueue* queue = (LockFreeQueue*)malloc(sizeof(LockFreeQueue));
if (queue == NULL) {
fprintf(stderr, "Memory allocation failed\n");
exit(1);
}
Node* sentinel = createNode(0);
atomic_store(&queue->head, sentinel);
atomic_store(&queue->tail, sentinel);
return queue;
}
// Function to enqueue a value into the lock-free queue
void enqueue(LockFreeQueue* queue, int data) {
Node* newNode = createNode(data);
Node* tail;
while (true) {
tail = atomic_load(&queue->tail);
Node* next = atomic_load(&tail->next);
if (tail == atomic_load(&queue->tail)) {
if (next == NULL) {
if (atomic_compare_exchange_weak(&tail->next, &next, newNode)) {
break;
}
} else {
atomic_compare_exchange_weak(&queue->tail, &tail, next);
}
}
}
atomic_compare_exchange_weak(&queue->tail, &tail, newNode);
}
// Function to dequeue a value from the lock-free queue
int dequeue(LockFreeQueue* queue) {
while (true) {
Node* head = atomic_load(&queue->head);
Node* tail = atomic_load(&queue->tail);
Node* next = atomic_load(&head->next);
if (head == atomic_load(&queue->head)) {
if (head == tail) {
if (next == NULL) {
return -1; // Queue is empty
}
atomic_compare_exchange_weak(&queue->tail, &tail, next);
} else {
int data = next->data;
if (atomic_compare_exchange_weak(&queue->head, &head, next)) {
free(head);
return data;
}
}
}
}
}
// Thread function to count prime numbers
void* countPrimes(void* arg) {
ThreadArgs* args = (ThreadArgs*)arg;
int count = 0;
int num;
while ((num = dequeue(args->queue)) != -1) {
if (isPrime(num)) {
count++;
}
}
*(args->counter) += count;
return NULL;
}
int main() {
LockFreeQueue* queue = createLockFreeQueue();
int total_counter = 0;
pthread_t threads[NUM_THREADS];
ThreadArgs threadArgs[NUM_THREADS];
int num;
while (scanf("%d", &num) == 1) {
enqueue(queue, num);
}
// Create threads
for (int i = 0; i < NUM_THREADS; i++) {
threadArgs[i].queue = queue;
threadArgs[i].counter = &total_counter;
pthread_create(&threads[i], NULL, countPrimes, (void*)&threadArgs[i]);
}
// Join threads
for (int i = 0; i < NUM_THREADS; i++) {
pthread_join(threads[i], NULL);
}
printf("%d total primes.\n", total_counter);
// Clean up
free(queue);
return 0;
}
使用多个线程计算数组中素数的特殊问题有一个极简的解决方案,它只需要一个共享原子计数器。
算法:
fetch_sub(shared_counter, 1) - 1
来获取下一个数组元素的索引,以检查它是否是素数。void*
)。这样,您的工作线程仅在共享计数器的
fetch_sub
上进行竞争。争夺一个共享原子计数器是迄今为止最简单、最有效的解决方案。
为了将争用降至最低限度,每个工作线程可能不想获取要处理的下一个 1 个索引,而是要处理的下一个 1,000,000 个索引。