多处理管理器无法从共享进程中 apply_async。池

问题描述 投票:0回答:1

我有两个守护进程正在运行。一个将数据输入到共享的 multiprocessing.Queue 中,另一个从该队列中获取数据并将其提交到 multiprocessing.pool.apply_async 中。异步结果被发送到另一个 multiprocessing.Queue。

一些示例代码:

import multiprocessing
from multiprocessing import Process
import random
import time


def my_method(i):
    return i*i


class DataFeeder:
    input_queue = None

    @staticmethod
    def stream_to_queue(iq):
        if DataFeeder.input_queue is None:
            DataFeeder.input_queue = iq

        while True:
            time.sleep(1)
            dat = random.choice(range(0, 50))
            print(f"feeding {dat}")
            DataFeeder.input_queue.put(dat)


class DataEater:
    input_queue = None
    results_queue = None
    pool = None

    @staticmethod
    def eat(iq, rq, p):
        if DataEater.input_queue is None:
            DataEater.input_queue = iq
        if DataEater.results_queue is None:
            DataEater.results_queue = rq
        if DataEater.pool is None:
            DataEater.pool = p

        while True:
            time.sleep(0.1)
            dat = DataEater.input_queue.get(0.1) # 100ms timeout
            print(f"eating {dat}")
            async_result = DataEater.pool.apply_async(my_method, (dat,))
            print(f"async_result {async_result}")
            DataEater.results_queue.put_nowait(async_result)


if __name__ == '__main__':
    with multiprocessing.Manager() as m:
        input_q = m.Queue()
        output_q = m.Queue()
        pool = m.Pool(8)

        dfp = Process(target=DataFeeder.stream_to_queue, args=(input_q,), daemon=True)
        dfp.start()

        dep = Process(target=DataEater.eat, args=(input_q, output_q, pool), daemon=True)
        dep.start()

        dep.join()
        dfp.join()

生成的堆栈跟踪如下:

Process Process-3:
Traceback (most recent call last):
  File "/opt/anaconda3/envs/integration_tests/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/opt/anaconda3/envs/integration_tests/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/chhunb/PycharmProjects/micromanager_server/umanager_server/multiproctest.py", line 54, in eat
    async_result = DataEater.pool.apply_async(my_method, (dat,))
  File "<string>", line 2, in apply_async
  File "/opt/anaconda3/envs/integration_tests/lib/python3.9/multiprocessing/managers.py", line 816, in _callmethod
    proxytype = self._manager._registry[token.typeid][-1]
AttributeError: 'NoneType' object has no attribute '_registry'

我尝试在

eat()
方法中在本地生成池,以便它可以从新进程中生成。这不起作用,因为结果是
async_result
并且必须使用
results_queue
与其他进程共享。

我尝试通过将一些字符串硬编码到 apply-async 输入中来完全忽略 DataFeeder。同样的错误。

这个开放的 python 问题非常相似,即使与我正在做的不完全相同: https://github.com/python/cpython/issues/80100 该问题仍然悬而未决,这是否意味着它在更高版本的 python 中从未得到解决?

我是否必须考虑完全不同的设计模式才能获得此功能?

python python-multiprocessing pool apply-async
1个回答
0
投票

以下是一个 hackish 方法,仅在 Windows 上的 Python 3.11 上进行了测试。除了 CPython 实现、其他 Python 版本、其他操作系统之外,它可能会中断

此外,它目前在进程之间传输管理器服务器的身份验证密钥,但如果需要,可以使用固定密钥进行更改。

import multiprocessing
from multiprocessing import Process
import threading

import random
import time


def my_method(i):
    return i*i


class DataFeeder:
    input_queue = None

    @staticmethod
    def stream_to_queue(iq):
        if DataFeeder.input_queue is None:
            DataFeeder.input_queue = iq

        while True:
            time.sleep(1)
            dat = random.choice(range(0, 50))
            print(f"feeding {dat}")
            DataFeeder.input_queue.put(dat)


class DataEater:
    input_queue = None
    results_queue = None
    pool = None

    @staticmethod
    def eat(iq, rq, p, adr, auth, registry):
        
        if DataEater.input_queue is None:
            DataEater.input_queue = iq
        if DataEater.results_queue is None:
            DataEater.results_queue = rq
        if DataEater.pool is None:
            DataEater.pool = p

        # Here begins the magic (aka dirty hack):
        # Constructing a new manager connected to the same server process
        # as the manager in main thread           
        from multiprocessing.managers import SyncManager
        
        manager = SyncManager(address=adr, authkey=auth)

        # Updating the registry:

        registry.update(SyncManager._registry)
        SyncManager._registry = registry
        
        manager.connect()
        
        # Setting our manager as manager of the pool proxy
        DataEater.pool._manager = manager

        # Continue normally

        while True:
            time.sleep(0.1)
            dat = DataEater.input_queue.get(0.1) # 100ms timeout
            print(f"eating {dat}")
            async_result = DataEater.pool.apply_async(my_method, (dat,))
            print(f"async_result {async_result}")
            DataEater.results_queue.put_nowait(async_result)


# Additional function/process to wait for the asynchronous result and print it:
def data_printer(output_q):
    while True:
        item = output_q.get()
        result = item.get()
        print(f"awaited async result {result}")


if __name__ == '__main__':
    with multiprocessing.Manager() as m:
        input_q = m.Queue()
        output_q = m.Queue()
        pool = m.Pool(8)
        
        dpp = Process(target=data_printer, args=(output_q,), daemon=True)
        dpp.start()

        dfp = Process(target=DataFeeder.stream_to_queue, args=(input_q,), daemon=True)
        dfp.start()

        # Three additional arguments added to create a remote manager in the process
        dep = Process(target=DataEater.eat, args=(input_q, output_q, pool, m.address, m._authkey, m._registry), daemon=True)
        dep.start()

        dep.join()
        dfp.join()
        dpp.join()
© www.soinside.com 2019 - 2024. All rights reserved.