与队列的异步多处理通信 - 仅一个协程运行

问题描述 投票:0回答:1

我有一个管理器脚本,它启动一些进程,然后使用两个协程(一个用于监视,一个用于收集结果)。由于某种原因,似乎只有一个协程在运行,我错过了什么? (我不使用 asyncio)

enter image description here

import multiprocessing as mp
import time 
import asyncio
import logging 

logging.basicConfig(level=logging.DEBUG)

class Process(mp.Process):
    def __init__(self, task_queue: mp.Queue, result_queue: mp.Queue):
        super().__init__()
        self.task_queue = task_queue
        self.result_queue = result_queue
        logging.info('Process init')

    def run(self):
        while not self.task_queue.empty():
            try:
                task = self.task_queue.get(timeout=1)
            except mp.Queue.Empty:
                logging.info('Task queue is empty')
                break
            
            time.sleep(1)
            logging.info('Processing task %i (pid %i)', task, self.pid)
            self.result_queue.put(task)
            
        logging.info('Process run')

class Manager:
    def __init__(self):
        self.processes = []
        self.task_queue = mp.Queue()
        self.result_queue = mp.Queue()
        self.keep_running = True

    async def monitor(self):
        while self.keep_running:
            await asyncio.sleep(0.1)
            logging.info('Task queue size: %i', self.task_queue.qsize())
            logging.info('Result queue size: %i', self.result_queue.qsize())
            self.keep_running = any([p.is_alive() for p in self.processes])


    async def consume_results(self):
        while self.keep_running:
            try:
                result = self.result_queue.get()
            except mp.Queue.Empty:
                logging.info('Result queue is empty')
                continue

            logging.info('Got result: %s', result)

    def start(self):
        # Populate the task queue
        for i in range(10):
            self.task_queue.put(i)

        # Start the processes
        for i in range(3):
            p = Process(self.task_queue, self.result_queue)
            p.start()
            self.processes.append(p)

        # Wait for the processes to finish
        loop = asyncio.get_event_loop()
        loop.create_task(self.monitor())
        loop.create_task(self.consume_results())

manager = Manager()
manager.start()
  • 期望看到监视器队列大小,但仅运行
    consume_results()
python python-asyncio
1个回答
0
投票

下次,如果您需要帮助,请精确说明您的要求以及您正在处理的环境。我们凡人无法自动浏览你脑海中的上下文。

所以我假设你 监视队列大小,并且您没有提到您所在的环境,我假设它是最新的 3.12.x.


问题

请添加日志,您的代码有大量错误和问题,会引发大量日志。我只能认为您没有尝试任何方法来修复您的代码,而无需努力阅读文档。你在SO呆了10年就知道这个规则了。


  • 您通过 .get() 调用来
    BLOCK
    异步线程。阅读
    multiprocessing
    asyncio
    文档,而不是 “我不使用 asyncio”
    async def monitor(self):
        while self.keep_running:
            try:
                result = self.result_queue.get()


  • 没有
    multiprocessing.Queue.Empty
During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "...\Python312\Lib\multiprocessing\process.py", line 314, in _bootstrap
    self.run()
  File "E:...\StackOverflow\78475657\78475657.py", line 20, in run
    except mp.Queue.Empty:
           ^^^^^^^^^^^^^^
AttributeError: 'function' object has no attribute 'Empty'


RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

        To fix this issue, refer to the "Safe importing of main module"
        section in https://docs.python.org/3/library/multiprocessing.html     


  • 没有等待这个过程。您的主进程已完成,因此在所有子进程上抛出
    KeyboardInterrupt
    以停止程序。
Process Process-3:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "...\Python\Python312\Lib\multiprocessing\process.py", line 314, in _bootstrap
    self.run()
  File "...\StackOverflow\78475657\78475657.py", line 24, in run
    time.sleep(1)
KeyboardInterrupt
  File "...\Python\Python312\Lib\multiprocessing\process.py", line 314, in _bootstrap
    self.run()
  File "...\Python\Python312\Lib\multiprocessing\process.py", line 314, in _bootstrap
    self.run()
  File "...\StackOverflow\78475657\78475657.py", line 24, in run
    time.sleep(1)
  File "...\StackOverflow\78475657\78475657.py", line 24, in run
    time.sleep(1)
KeyboardInterrupt
KeyboardInterrupt
Exception ignored in atexit callback: <function _exit_function at 0x000001C044853CE0>
Traceback (most recent call last):
  File "...\Python\Python312\Lib\multiprocessing\util.py", line 357, in _exit_function
    p.join()
  File "...\Python\Python312\Lib\multiprocessing\process.py", line 149, in join
    res = self._popen.wait(timeout)
          ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "...\Python\Python312\Lib\multiprocessing\popen_spawn_win32.py", line 110, in wait
    res = _winapi.WaitForSingleObject(int(self._handle), msecs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt: 


  • 没有等待循环。因此,每个任务都立即通过
    KeyboardInterrupt
    被取消。
# Wait for the processes to finish
loop = asyncio.get_event_loop()
loop.create_task(self.monitor())
loop.create_task(self.consume_results())
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-1' coro=<Manager.monitor() running at ...\StackOverflow\78475657\78475657.py:39>>
...\Python\Python312\Lib\asyncio\base_events.py:709: RuntimeWarning: coroutine 'Manager.monitor' was never awaited
  self._ready.clear()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-2' coro=<Manager.consume_results() running at ...\StackOverflow\78475657\78475657.py:46>>
...\Python\Python312\Lib\asyncio\base_events.py:709: RuntimeWarning: coroutine 'Manager.consume_results' was never awaited
  self._ready.clear()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback


  • asyncio.get_event_loop()
    弃用警告。

自版本 3.12 起已弃用:如果没有当前事件循环,则会发出弃用警告。在未来的某些 Python 版本中,这将成为一个错误。

...\StackOverflow\78475657\78475657.py:67: DeprecationWarning: There is no current event loop
  loop = asyncio.get_event_loop()


  • 一些无意义的检查循环检查队列是否为空,但通过超时再次检查。
        while not self.task_queue.empty():  # < no use due to race condition
            try:
                task = self.task_queue.get(timeout=1)  # actual important check
            except mp.Queue.Empty:
                logging.info('Task queue is empty')
                break


修复

  • 使用
    queue.Empty
import queue

try:
    ...
except queue.Empty:
    ...

  • 添加
    Process.join
    或更好地使用
    mp.Pool
    。确实在文档的第一页。
# from https://docs.python.org/3/library/multiprocessing.html
from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

  • 使用
    asyncio.new_event_loop()
    因为我们没有创建任何其他循环。
  • 等待任务。也在文档上。
import asyncio


async def some_task():
    print("Start")
    await asyncio.sleep(10)
    print("End")


def unrecommended_old_way():
    loop = asyncio.new_event_loop()
    task_1 = loop.create_task(some_task())
    task_2 = loop.create_task(some_task())

    print("Task spawn done")
    loop.run_until_complete(asyncio.gather(task_1, task_2))
    print("Tasks all done")


unrecommended_old_way()
Task spawn done
Start
Start
End
End
Tasks all done

  • 或者只是使用更好的推荐方法来完成上述
import asyncio


async def some_task():
    print("Start")
    await asyncio.sleep(10)
    print("End")


async def recommended_way():
    task_1 = asyncio.create_task(some_task())
    task_2 = asyncio.create_task(some_task())

    print("Task spawn done")
    await asyncio.gather(task_1, task_2)
    print("Tasks all done")


asyncio.run(recommended_way())




实际修复

以上所有内容都是针对每个问题的修复,但这些问题大部分都通过不自己混合框架来解决

所有

asyncio
在代码中所做的只是定期检查和提取结果,就好像它是基于线程的应用程序一样。这从来都不是一个好主意,你正在抛弃
asyncio
的整个概念和想法。混合完全不相关的两个框架(
multiprocessing
asyncio
)只是为了不正确使用任一框架,这不值得带来头痛。

考虑到这一点,这就是您想要的,

asyncio
multiprocessing
的混合。

other_file.py:

import multiprocessing as mp
import logging
import queue
import time


class WorkerProcess(mp.Process):
    def __init__(self, task_queue: mp.JoinableQueue, result_queue: mp.Queue, timeout_sec=2):
        super().__init__()

        self.task_queue = task_queue
        self.result_queue = result_queue
        self.timeout = timeout_sec

    def run(self):
        logging.info(f"[{self.pid:10}] Started")

        while True:
            try:
                task_id, task = self.task_queue.get(timeout=self.timeout)
            except queue.Empty:
                break

            try:
                logging.info(f"[{self.pid:10}] Processing {task_id}")
                time.sleep(1)
                logging.info(f"[{self.pid:10}] Processing {task_id} done")

                self.result_queue.put(task)
            finally:
                # signal JoinableQueue that task we took is done
                self.task_queue.task_done()

        logging.info(f"[{self.pid:10}] Done")


在其他文件或笔记本中:

import multiprocessing as mp
import asyncio
import logging

from other_file import WorkerProcess

logging.root.setLevel(logging.NOTSET)


class Manager:
    def __init__(self):
        self.task_queue = mp.JoinableQueue()
        self.result_queue = mp.Queue()

    async def _monitor(self):
        """Periodically reports queue sizes.
        Automatically cancelled after all processes are killed."""

        while True:
            await asyncio.sleep(0.5)

            logging.info(f"[Manager]   Task queue size: {self.task_queue.qsize()}")
            logging.info(f"[Manager] Result queue size: {self.result_queue.qsize()}")

    async def _process(self):
        """Do whatever you want here for results"""

        while (val := await asyncio.to_thread(self.result_queue.get)) is not None:
            logging.info(f"[Manager] Got result {val}")

    async def start(self):
        """Wait for all tasks to be done"""

        # Populate the task queue
        for task_idx, task in enumerate(range(10)):
            self.task_queue.put((task_idx, task))

        # create loop, monitoring task & result process task
        monitor_task = asyncio.create_task(self._monitor())
        result_proc_task = asyncio.create_task(self._process())

        # Start the processes
        processes = [
            WorkerProcess(self.task_queue, self.result_queue)
            for _ in range(3)
        ]
        for process in processes:
            process.start()

        logging.info("[Manager] Started")

        # wait for all tasks to be done. If so, cancel monitor task and wait for it to end
        await asyncio.to_thread(self.task_queue.join)
        monitor_task.cancel()

        # since .task_done() emit is AFTER putting to result queue,
        # there's no race condition here. Send sentinel to result queue
        # to signal end of process.
        self.result_queue.put(None)

        logging.info("[Manager] Done")


manager = Manager()

# if running normally:
# asyncio.run(manager.start())

# in notebook (already in async context)
await manager.start()

正常运行日志:

INFO:root:[   Manager] Started
INFO:root:[     22356] Started
INFO:root:[     22356] Processing 0
INFO:root:[     14468] Started
INFO:root:[     14468] Processing 1
INFO:root:[     21076] Started
INFO:root:[     21076] Processing 2
INFO:root:[   Manager]   Task queue size: 7
INFO:root:[   Manager] Result queue size: 0
INFO:root:[     22356] Processing 0 done
INFO:root:[     14468] Processing 1 done
INFO:root:[     22356] Processing 3
INFO:root:[   Manager] Got result 0
INFO:root:[     14468] Processing 4
INFO:root:[   Manager] Got result 1
INFO:root:[     21076] Processing 2 done
INFO:root:[     21076] Processing 5
INFO:root:[   Manager] Got result 2
INFO:root:[   Manager]   Task queue size: 4
INFO:root:[   Manager] Result queue size: 0
INFO:root:[     22356] Processing 3 done
INFO:root:[     22356] Processing 6
INFO:root:[   Manager] Got result 3
INFO:root:[     14468] Processing 4 done
INFO:root:[     14468] Processing 7
INFO:root:[   Manager] Got result 4
INFO:root:[     21076] Processing 5 done
INFO:root:[     21076] Processing 8
INFO:root:[   Manager] Got result 5
INFO:root:[   Manager]   Task queue size: 1
INFO:root:[   Manager] Result queue size: 0
INFO:root:[     22356] Processing 6 done
INFO:root:[     22356] Processing 9
INFO:root:[   Manager] Got result 6
INFO:root:[     14468] Processing 7 done
INFO:root:[   Manager] Got result 7
INFO:root:[     21076] Processing 8 done
INFO:root:[   Manager] Got result 8
INFO:root:[   Manager]   Task queue size: 0
INFO:root:[   Manager] Result queue size: 0
INFO:root:[     22356] Processing 9 done
INFO:root:[   Manager] Got result 9
INFO:root:[     21076] Done
INFO:root:[     14468] Done
INFO:root:[     22356] Done
INFO:root:[   Manager] Done


笔记本不会有这样的进程日志,因为每个进程都输出自己的 stdout stderr 文件,这不是笔记本,而是控制台。

enter image description here

终端:

INFO:root:[     22228] Started
INFO:root:[     22228] Processing 0
INFO:root:[     21712] Started
INFO:root:[     21712] Processing 1
INFO:root:[     20724] Started
INFO:root:[     20724] Processing 2
INFO:root:[     21712] Processing 1 done
INFO:root:[     22228] Processing 0 done
INFO:root:[     21712] Processing 3
INFO:root:[     22228] Processing 4
INFO:root:[     20724] Processing 2 done
INFO:root:[     20724] Processing 5
INFO:root:[     21712] Processing 3 done
INFO:root:[     22228] Processing 4 done
INFO:root:[     21712] Processing 6
INFO:root:[     22228] Processing 7
INFO:root:[     20724] Processing 5 done
INFO:root:[     20724] Processing 8
INFO:root:[     21712] Processing 6 done
INFO:root:[     22228] Processing 7 done
INFO:root:[     21712] Processing 9
INFO:root:[     20724] Processing 8 done
INFO:root:[     21712] Processing 9 done
INFO:root:[     22228] Done
INFO:root:[     20724] Done
INFO:root:[     21712] Done
[I 2024-05-15 16:43:24.204 ServerApp] Saving file at /Untitled1.ipynb
© www.soinside.com 2019 - 2024. All rights reserved.