我有两个守护进程正在运行。一个将数据输入到共享的 multiprocessing.Queue 中,另一个从该队列中获取数据并将其提交到 multiprocessing.pool.apply_async 中。异步结果被发送到另一个 multiprocessing.Queue。
一些示例代码:
import multiprocessing
from multiprocessing import Process
import random
import time
def my_method(i):
return i*i
class DataFeeder:
input_queue = None
@staticmethod
def stream_to_queue(iq):
if DataFeeder.input_queue is None:
DataFeeder.input_queue = iq
while True:
time.sleep(1)
dat = random.choice(range(0, 50))
print(f"feeding {dat}")
DataFeeder.input_queue.put(dat)
class DataEater:
input_queue = None
results_queue = None
pool = None
@staticmethod
def eat(iq, rq, p):
if DataEater.input_queue is None:
DataEater.input_queue = iq
if DataEater.results_queue is None:
DataEater.results_queue = rq
if DataEater.pool is None:
DataEater.pool = p
while True:
time.sleep(0.1)
dat = DataEater.input_queue.get(0.1) # 100ms timeout
print(f"eating {dat}")
async_result = DataEater.pool.apply_async(my_method, (dat,))
print(f"async_result {async_result}")
DataEater.results_queue.put_nowait(async_result)
if __name__ == '__main__':
with multiprocessing.Manager() as m:
input_q = m.Queue()
output_q = m.Queue()
pool = m.Pool(8)
dfp = Process(target=DataFeeder.stream_to_queue, args=(input_q,), daemon=True)
dfp.start()
dep = Process(target=DataEater.eat, args=(input_q, output_q, pool), daemon=True)
dep.start()
dep.join()
dfp.join()
生成的堆栈跟踪如下:
Process Process-3:
Traceback (most recent call last):
File "/opt/anaconda3/envs/integration_tests/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/opt/anaconda3/envs/integration_tests/lib/python3.9/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/Users/chhunb/PycharmProjects/micromanager_server/umanager_server/multiproctest.py", line 54, in eat
async_result = DataEater.pool.apply_async(my_method, (dat,))
File "<string>", line 2, in apply_async
File "/opt/anaconda3/envs/integration_tests/lib/python3.9/multiprocessing/managers.py", line 816, in _callmethod
proxytype = self._manager._registry[token.typeid][-1]
AttributeError: 'NoneType' object has no attribute '_registry'
我尝试在
eat()
方法中在本地生成池,以便它可以从新进程中生成。这不起作用,因为结果是 async_result
并且必须使用 results_queue
与其他进程共享。
我尝试通过将一些字符串硬编码到 apply-async 输入中来完全忽略 DataFeeder。同样的错误。
这个开放的 python 问题非常相似,即使与我正在做的不完全相同: https://github.com/python/cpython/issues/80100 该问题仍然悬而未决,这是否意味着它在更高版本的 python 中从未得到解决?
我是否必须考虑完全不同的设计模式才能获得此功能?
以下是一个 hackish 方法,仅在 Windows 上的 Python 3.11 上进行了测试。除了 CPython 实现、其他 Python 版本、其他操作系统之外,它可能会中断
此外,它目前在进程之间传输管理器服务器的身份验证密钥,但如果需要,可以使用固定密钥进行更改。
import multiprocessing
from multiprocessing import Process
import threading
import random
import time
def my_method(i):
return i*i
class DataFeeder:
input_queue = None
@staticmethod
def stream_to_queue(iq):
if DataFeeder.input_queue is None:
DataFeeder.input_queue = iq
while True:
time.sleep(1)
dat = random.choice(range(0, 50))
print(f"feeding {dat}")
DataFeeder.input_queue.put(dat)
class DataEater:
input_queue = None
results_queue = None
pool = None
@staticmethod
def eat(iq, rq, p, adr, auth, registry):
if DataEater.input_queue is None:
DataEater.input_queue = iq
if DataEater.results_queue is None:
DataEater.results_queue = rq
if DataEater.pool is None:
DataEater.pool = p
# Here begins the magic (aka dirty hack):
# Constructing a new manager connected to the same server process
# as the manager in main thread
from multiprocessing.managers import SyncManager
manager = SyncManager(address=adr, authkey=auth)
# Updating the registry:
registry.update(SyncManager._registry)
SyncManager._registry = registry
manager.connect()
# Setting our manager as manager of the pool proxy
DataEater.pool._manager = manager
# Continue normally
while True:
time.sleep(0.1)
dat = DataEater.input_queue.get(0.1) # 100ms timeout
print(f"eating {dat}")
async_result = DataEater.pool.apply_async(my_method, (dat,))
print(f"async_result {async_result}")
DataEater.results_queue.put_nowait(async_result)
# Additional function/process to wait for the asynchronous result and print it:
def data_printer(output_q):
while True:
item = output_q.get()
result = item.get()
print(f"awaited async result {result}")
if __name__ == '__main__':
with multiprocessing.Manager() as m:
input_q = m.Queue()
output_q = m.Queue()
pool = m.Pool(8)
dpp = Process(target=data_printer, args=(output_q,), daemon=True)
dpp.start()
dfp = Process(target=DataFeeder.stream_to_queue, args=(input_q,), daemon=True)
dfp.start()
# Three additional arguments added to create a remote manager in the process
dep = Process(target=DataEater.eat, args=(input_q, output_q, pool, m.address, m._authkey, m._registry), daemon=True)
dep.start()
dep.join()
dfp.join()
dpp.join()