如何使用Python C API实现多线程程序?

问题描述 投票:0回答:2

我有以下使用 Python C API 的程序。它创建许多线程(

NUM_THREADS
常量)。在每个线程中都有一个无限循环,它执行一个非常简单的操作:创建一个 Python 字典,其中键
id
设置为线程 id,然后将该字典转储到字符串中并打印它(使用
dumps 中的 
json
 函数) 
Python 模块)。之后线程等待
WAIT_TIME
秒并再次执行相同操作。

// g++ -g -o multithread multithread.cpp -I/usr/include/python3.11/ -lpython3.11 -lpthread

#include <Python.h>
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>

// WAIT_TIME is in seconds
#define NUM_THREADS 20
#define WAIT_TIME   1

// Global semaphore declaration
sem_t semaphore;

// Global JSON module object, to be accesses in every thread
PyObject* jsonModule;

// Function to be executed by each thread
void* thread_function(void* arg) {
    long thread_id = (long)arg;
    while(true) {
        sem_wait(&semaphore); // mark 1
        PyObject* myDict = Py_BuildValue("{s:i}", "id", thread_id);
        PyObject* result = PyObject_CallMethod(jsonModule, "dumps", "O", myDict);
        PyObject* repr = PyObject_Repr(result);
        const char* result_str = PyUnicode_AsUTF8(repr);
        printf("Thread %ld result: %s\n", thread_id, result_str);
        Py_XDECREF(result);
        Py_XDECREF(myDict);
        Py_XDECREF(repr);
        sem_post(&semaphore); // mark 2
        sleep(WAIT_TIME);
    }        
    pthread_exit(NULL);
}

int main() {
    pthread_t threads[NUM_THREADS];
    int i;

    // Initialize the Python interpreter
    Py_Initialize();

    // Import json module
    jsonModule = PyImport_ImportModule("json");

    // Initialize the semaphore
    sem_init(&semaphore, 0, 1);

    // Create threads
    for (i = 0; i < NUM_THREADS; ++i) {
        if (pthread_create(&threads[i], NULL, thread_function, (void*)(long)i) != 0) {
            fprintf(stderr, "Error creating thread\n");
            return 1;
        }
    }

    // Join threads
    for (i = 0; i < NUM_THREADS; ++i) {
        if (pthread_join(threads[i], NULL) != 0) {
            fprintf(stderr, "Error joining thread\n");
            return 1;
        }
    }

    // Free resources (never reach this point, but added for simmetry)
    Py_XDECREF(jsonModule);

    // Finalize the Python interpreter
    Py_Finalize();

    // Destroy the semaphore
    sem_destroy(&semaphore);

    printf("All threads have completed\n");
    return 0;
}

据我凭经验检查,只要在开始调用 Py* 函数之前获取信号量,该程序就可以工作。也就是说,只要使用

mark 1
mark 2
点中的线即可。

如果我删除

mark 1
mark 2
语句(因此删除信号量基础排除),那么程序最终很快就会崩溃。查看生成的
core
文件的回溯,问题似乎出在
PyObject_CallMethod()
函数的调用中。

(gdb) bt
#0  0x00007fb315289c19 in ?? () from /lib/x86_64-linux-gnu/libpython3.11.so.1.0
#1  0x00007fb31526aac6 in ?? () from /lib/x86_64-linux-gnu/libpython3.11.so.1.0
#2  0x00007fb31517d80b in ?? () from /lib/x86_64-linux-gnu/libpython3.11.so.1.0
#3  0x00007fb31517ddd9 in PyObject_CallMethod () from /lib/x86_64-linux-gnu/libpython3.11.so.1.0
#4  0x000055e1a763f2ef in thread_function (arg=0x11) at multithread.cpp:24
#5  0x00007fb314ea8134 in start_thread (arg=<optimized out>) at ./nptl/pthread_create.c:442
#6  0x00007fb314f287dc in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81

这有点令人惊讶,因为所有

PyObject*
变量都是线程函数的本地变量(
myDict
result
repr
)。唯一一个非线程本地的
PyObject*
变量是模块本身的变量 (
jsonModule
)。是不是这个引起了问题?

这是否意味着 Python C 库不是线程安全的,因此不能同时运行多个 Py* 函数?除了我使用的(即在我自己的代码中实现的信号量)之外,还有其他选择吗?此类程序有什么好的实现模式(即使用 Python C API 的多线程)吗?

提前致谢!

c multithreading python-c-api
2个回答
0
投票

Python 不是线程安全

Python 解释器不是完全线程安全的。为了支持多线程 Python 程序,有一个全局锁,称为“全局解释器锁”或“GIL”,当前线程必须持有该锁才能安全地访问 Python 对象。如果没有锁,即使是最简单的操作也可能会在多线程程序中导致问题:例如,当两个线程同时递增同一对象的引用计数时,引用计数最终可能只递增一次而不是两次。 因此,存在这样的规则:只有获得了GIL的线程才可以操作Python对象或调用Python/C API函数。为了模拟执行的并发性,解释器会定期尝试切换线程(请参阅

sys.setswitchinterval()

)。锁也会在可能阻塞的 I/O 操作(例如读取或写入文件)周围释放,以便其他 Python 线程可以同时运行。 Python 解释器将一些特定于线程的簿记信息保存在名为 PyThreadState 的数据结构中。还有一个指向当前

PyThreadState

的全局变量:可以使用 PyThreadState_Get() 检索它。 ...

您对信号量的使用有效地复制了 Python 的“全局解释器锁”的功能。

CPython 解释器数据结构是

不是线程安全的

0
投票
全局解释器锁(GIL)

,以及为什么多线程 CPython 程序注定会很慢(因为只有未锁定的部分才能真正是多线程的,并且大多数库无法释放 GIL,因为它不安全)对纯 Python 对象执行此操作)。引用文档: 在 CPython 中,全局解释器锁(GIL)是一个互斥体,用于保护对 Python 对象的访问

,防止多个线程同时执行 Python 字节码。 GIL 可防止竞争条件并确保线程安全。关于
Python GIL 如何在这些领域提供帮助的精彩解释可以在这里找到

。简而言之,这个互斥锁是必要的,主要是因为 CPython 的内存管理不是线程安全的 当您不处理 CPython 对象时(例如,用于纯数值计算或 IO 操作),您可以释放 GIL。通常使用多处理来规避这种强大的限制。这意味着使用多个解释器进程,并且通常使用进程间通信(IPC)进行通信,这通常非常昂贵(共享内存是可能的,但不能在解释器 Python 对象上)。

© www.soinside.com 2019 - 2024. All rights reserved.