我有一个管理器脚本,它启动一些进程,然后使用两个协程(一个用于监视,一个用于收集结果)。由于某种原因,似乎只有一个协程在运行,我错过了什么? (我不使用 asyncio)
import multiprocessing as mp
import time
import asyncio
import logging
logging.basicConfig(level=logging.DEBUG)
class Process(mp.Process):
def __init__(self, task_queue: mp.Queue, result_queue: mp.Queue):
super().__init__()
self.task_queue = task_queue
self.result_queue = result_queue
logging.info('Process init')
def run(self):
while not self.task_queue.empty():
try:
task = self.task_queue.get(timeout=1)
except mp.Queue.Empty:
logging.info('Task queue is empty')
break
time.sleep(1)
logging.info('Processing task %i (pid %i)', task, self.pid)
self.result_queue.put(task)
logging.info('Process run')
class Manager:
def __init__(self):
self.processes = []
self.task_queue = mp.Queue()
self.result_queue = mp.Queue()
self.keep_running = True
async def monitor(self):
while self.keep_running:
await asyncio.sleep(0.1)
logging.info('Task queue size: %i', self.task_queue.qsize())
logging.info('Result queue size: %i', self.result_queue.qsize())
self.keep_running = any([p.is_alive() for p in self.processes])
async def consume_results(self):
while self.keep_running:
try:
result = self.result_queue.get()
except mp.Queue.Empty:
logging.info('Result queue is empty')
continue
logging.info('Got result: %s', result)
def start(self):
# Populate the task queue
for i in range(10):
self.task_queue.put(i)
# Start the processes
for i in range(3):
p = Process(self.task_queue, self.result_queue)
p.start()
self.processes.append(p)
# Wait for the processes to finish
loop = asyncio.get_event_loop()
loop.create_task(self.monitor())
loop.create_task(self.consume_results())
manager = Manager()
manager.start()
consume_results()
下次,如果您需要帮助,请精确说明您的要求以及您正在处理的环境。我们凡人无法自动浏览你脑海中的上下文。
所以我假设你 监视队列大小,并且您没有提到您所在的环境,我假设它是最新的 3.12.x.
请添加日志,您的代码有大量错误和问题,会引发大量日志。我只能认为您没有尝试任何方法来修复您的代码,而无需努力阅读文档。你在SO呆了10年就知道这个规则了。
.get()
调用来BLOCK异步线程。阅读
multiprocessing
和 asyncio
文档,而不是 “我不使用 asyncio”! async def monitor(self):
while self.keep_running:
try:
result = self.result_queue.get()
multiprocessing.Queue.Empty
。During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "...\Python312\Lib\multiprocessing\process.py", line 314, in _bootstrap
self.run()
File "E:...\StackOverflow\78475657\78475657.py", line 20, in run
except mp.Queue.Empty:
^^^^^^^^^^^^^^
AttributeError: 'function' object has no attribute 'Empty'
多进程将无法工作没有防护,除非它在笔记本中。 (问题没有提到)
也无法在 Juypter Notebooks 上正常运行,并且 需要将目标函数保存为单独的文件并导入它
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
To fix this issue, refer to the "Safe importing of main module"
section in https://docs.python.org/3/library/multiprocessing.html
KeyboardInterrupt
以停止程序。Process Process-3:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
File "...\Python\Python312\Lib\multiprocessing\process.py", line 314, in _bootstrap
self.run()
File "...\StackOverflow\78475657\78475657.py", line 24, in run
time.sleep(1)
KeyboardInterrupt
File "...\Python\Python312\Lib\multiprocessing\process.py", line 314, in _bootstrap
self.run()
File "...\Python\Python312\Lib\multiprocessing\process.py", line 314, in _bootstrap
self.run()
File "...\StackOverflow\78475657\78475657.py", line 24, in run
time.sleep(1)
File "...\StackOverflow\78475657\78475657.py", line 24, in run
time.sleep(1)
KeyboardInterrupt
KeyboardInterrupt
Exception ignored in atexit callback: <function _exit_function at 0x000001C044853CE0>
Traceback (most recent call last):
File "...\Python\Python312\Lib\multiprocessing\util.py", line 357, in _exit_function
p.join()
File "...\Python\Python312\Lib\multiprocessing\process.py", line 149, in join
res = self._popen.wait(timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "...\Python\Python312\Lib\multiprocessing\popen_spawn_win32.py", line 110, in wait
res = _winapi.WaitForSingleObject(int(self._handle), msecs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt:
KeyboardInterrupt
被取消。# Wait for the processes to finish
loop = asyncio.get_event_loop()
loop.create_task(self.monitor())
loop.create_task(self.consume_results())
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-1' coro=<Manager.monitor() running at ...\StackOverflow\78475657\78475657.py:39>>
...\Python\Python312\Lib\asyncio\base_events.py:709: RuntimeWarning: coroutine 'Manager.monitor' was never awaited
self._ready.clear()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-2' coro=<Manager.consume_results() running at ...\StackOverflow\78475657\78475657.py:46>>
...\Python\Python312\Lib\asyncio\base_events.py:709: RuntimeWarning: coroutine 'Manager.consume_results' was never awaited
self._ready.clear()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
asyncio.get_event_loop()
弃用警告。自版本 3.12 起已弃用:如果没有当前事件循环,则会发出弃用警告。在未来的某些 Python 版本中,这将成为一个错误。
...\StackOverflow\78475657\78475657.py:67: DeprecationWarning: There is no current event loop
loop = asyncio.get_event_loop()
while not self.task_queue.empty(): # < no use due to race condition
try:
task = self.task_queue.get(timeout=1) # actual important check
except mp.Queue.Empty:
logging.info('Task queue is empty')
break
queue.Empty
import queue
try:
...
except queue.Empty:
...
Process.join
或更好地使用 mp.Pool
。确实在文档的第一页。# from https://docs.python.org/3/library/multiprocessing.html
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
asyncio.new_event_loop()
因为我们没有创建任何其他循环。import asyncio
async def some_task():
print("Start")
await asyncio.sleep(10)
print("End")
def unrecommended_old_way():
loop = asyncio.new_event_loop()
task_1 = loop.create_task(some_task())
task_2 = loop.create_task(some_task())
print("Task spawn done")
loop.run_until_complete(asyncio.gather(task_1, task_2))
print("Tasks all done")
unrecommended_old_way()
Task spawn done
Start
Start
End
End
Tasks all done
import asyncio
async def some_task():
print("Start")
await asyncio.sleep(10)
print("End")
async def recommended_way():
task_1 = asyncio.create_task(some_task())
task_2 = asyncio.create_task(some_task())
print("Task spawn done")
await asyncio.gather(task_1, task_2)
print("Tasks all done")
asyncio.run(recommended_way())
以上所有内容都是针对每个问题的修复,但这些问题大部分都通过不自己混合框架来解决
所有
asyncio
在代码中所做的只是定期检查和提取结果,就好像它是基于线程的应用程序一样。这从来都不是一个好主意,你正在抛弃 asyncio
的整个概念和想法。混合完全不相关的两个框架(multiprocessing
,asyncio
)只是为了不正确使用任一框架,这不值得带来头痛。
考虑到这一点,这就是您想要的,
asyncio
和multiprocessing
的混合。
other_file.py:
import multiprocessing as mp
import logging
import queue
import time
class WorkerProcess(mp.Process):
def __init__(self, task_queue: mp.JoinableQueue, result_queue: mp.Queue, timeout_sec=2):
super().__init__()
self.task_queue = task_queue
self.result_queue = result_queue
self.timeout = timeout_sec
def run(self):
logging.info(f"[{self.pid:10}] Started")
while True:
try:
task_id, task = self.task_queue.get(timeout=self.timeout)
except queue.Empty:
break
try:
logging.info(f"[{self.pid:10}] Processing {task_id}")
time.sleep(1)
logging.info(f"[{self.pid:10}] Processing {task_id} done")
self.result_queue.put(task)
finally:
# signal JoinableQueue that task we took is done
self.task_queue.task_done()
logging.info(f"[{self.pid:10}] Done")
在其他文件或笔记本中:
import multiprocessing as mp
import asyncio
import logging
from other_file import WorkerProcess
logging.root.setLevel(logging.NOTSET)
class Manager:
def __init__(self):
self.task_queue = mp.JoinableQueue()
self.result_queue = mp.Queue()
async def _monitor(self):
"""Periodically reports queue sizes.
Automatically cancelled after all processes are killed."""
while True:
await asyncio.sleep(0.5)
logging.info(f"[Manager] Task queue size: {self.task_queue.qsize()}")
logging.info(f"[Manager] Result queue size: {self.result_queue.qsize()}")
async def _process(self):
"""Do whatever you want here for results"""
while (val := await asyncio.to_thread(self.result_queue.get)) is not None:
logging.info(f"[Manager] Got result {val}")
async def start(self):
"""Wait for all tasks to be done"""
# Populate the task queue
for task_idx, task in enumerate(range(10)):
self.task_queue.put((task_idx, task))
# create loop, monitoring task & result process task
monitor_task = asyncio.create_task(self._monitor())
result_proc_task = asyncio.create_task(self._process())
# Start the processes
processes = [
WorkerProcess(self.task_queue, self.result_queue)
for _ in range(3)
]
for process in processes:
process.start()
logging.info("[Manager] Started")
# wait for all tasks to be done. If so, cancel monitor task and wait for it to end
await asyncio.to_thread(self.task_queue.join)
monitor_task.cancel()
# since .task_done() emit is AFTER putting to result queue,
# there's no race condition here. Send sentinel to result queue
# to signal end of process.
self.result_queue.put(None)
logging.info("[Manager] Done")
manager = Manager()
# if running normally:
# asyncio.run(manager.start())
# in notebook (already in async context)
await manager.start()
正常运行日志:
INFO:root:[ Manager] Started
INFO:root:[ 22356] Started
INFO:root:[ 22356] Processing 0
INFO:root:[ 14468] Started
INFO:root:[ 14468] Processing 1
INFO:root:[ 21076] Started
INFO:root:[ 21076] Processing 2
INFO:root:[ Manager] Task queue size: 7
INFO:root:[ Manager] Result queue size: 0
INFO:root:[ 22356] Processing 0 done
INFO:root:[ 14468] Processing 1 done
INFO:root:[ 22356] Processing 3
INFO:root:[ Manager] Got result 0
INFO:root:[ 14468] Processing 4
INFO:root:[ Manager] Got result 1
INFO:root:[ 21076] Processing 2 done
INFO:root:[ 21076] Processing 5
INFO:root:[ Manager] Got result 2
INFO:root:[ Manager] Task queue size: 4
INFO:root:[ Manager] Result queue size: 0
INFO:root:[ 22356] Processing 3 done
INFO:root:[ 22356] Processing 6
INFO:root:[ Manager] Got result 3
INFO:root:[ 14468] Processing 4 done
INFO:root:[ 14468] Processing 7
INFO:root:[ Manager] Got result 4
INFO:root:[ 21076] Processing 5 done
INFO:root:[ 21076] Processing 8
INFO:root:[ Manager] Got result 5
INFO:root:[ Manager] Task queue size: 1
INFO:root:[ Manager] Result queue size: 0
INFO:root:[ 22356] Processing 6 done
INFO:root:[ 22356] Processing 9
INFO:root:[ Manager] Got result 6
INFO:root:[ 14468] Processing 7 done
INFO:root:[ Manager] Got result 7
INFO:root:[ 21076] Processing 8 done
INFO:root:[ Manager] Got result 8
INFO:root:[ Manager] Task queue size: 0
INFO:root:[ Manager] Result queue size: 0
INFO:root:[ 22356] Processing 9 done
INFO:root:[ Manager] Got result 9
INFO:root:[ 21076] Done
INFO:root:[ 14468] Done
INFO:root:[ 22356] Done
INFO:root:[ Manager] Done
笔记本不会有这样的进程日志,因为每个进程都输出自己的 stdout stderr 文件,这不是笔记本,而是控制台。
终端:
INFO:root:[ 22228] Started
INFO:root:[ 22228] Processing 0
INFO:root:[ 21712] Started
INFO:root:[ 21712] Processing 1
INFO:root:[ 20724] Started
INFO:root:[ 20724] Processing 2
INFO:root:[ 21712] Processing 1 done
INFO:root:[ 22228] Processing 0 done
INFO:root:[ 21712] Processing 3
INFO:root:[ 22228] Processing 4
INFO:root:[ 20724] Processing 2 done
INFO:root:[ 20724] Processing 5
INFO:root:[ 21712] Processing 3 done
INFO:root:[ 22228] Processing 4 done
INFO:root:[ 21712] Processing 6
INFO:root:[ 22228] Processing 7
INFO:root:[ 20724] Processing 5 done
INFO:root:[ 20724] Processing 8
INFO:root:[ 21712] Processing 6 done
INFO:root:[ 22228] Processing 7 done
INFO:root:[ 21712] Processing 9
INFO:root:[ 20724] Processing 8 done
INFO:root:[ 21712] Processing 9 done
INFO:root:[ 22228] Done
INFO:root:[ 20724] Done
INFO:root:[ 21712] Done
[I 2024-05-15 16:43:24.204 ServerApp] Saving file at /Untitled1.ipynb