我可以使用 asyncio 读取和写入 multiprocessing.Pipe 吗?

问题描述 投票:0回答:2

我需要在 Python 中的进程之间进行通信,并且在每个进程中使用

asyncio
来实现并发网络 IO。

目前我在进程之间使用

multiprocessing.Pipe
send
recv
显着大量的数据,但是我在
asyncio
之外这样做,我相信我在
IO_WAIT中花费了大量的CPU时间
因为它。

看起来

asyncio
可以而且应该用于处理进程之间的管道IO,但是除了管道STDIN/STDOUT之外我找不到任何示例。

从我读到的内容看来,我应该用

loop.connect_read_pipe(PROTOCOL_FACTORY, PIPE)
注册管道,同样用于写入。但是我不明白
protocol_factory
的目的,因为它与
multiprocessing.Pipe
有关。甚至不清楚我是否应该创建一个
multiprocessing.Pipe
或者是否可以在
asyncio
内创建一个管道。

python python-3.x multiprocessing pipe python-asyncio
2个回答
11
投票

multiprocessing.Pipe
使用高级
multiprocessing.Connection
模块来pickle 和unpickle Python 对象并在底层传输额外的字节。如果您想使用
loop.connect_read_pipe()
从这些管道之一读取数据,您必须自己重新实现所有这些。

在不阻塞事件循环的情况下从

multiprocessing.Pipe
读取数据的最简单方法是使用
loop.add_reader()
。考虑以下示例:

import asyncio
import multiprocessing


def main():
    read, write = multiprocessing.Pipe(duplex=False)
    writer_process = multiprocessing.Process(target=writer, args=(write,))
    writer_process.start()
    asyncio.get_event_loop().run_until_complete(reader(read))


async def reader(read):
    data_available = asyncio.Event()
    asyncio.get_event_loop().add_reader(read.fileno(), data_available.set)

    while not read.poll():
        await data_available.wait()
        data_available.clear()

    print(read.recv())


def writer(write):
    write.send('Hello World')


if __name__ == '__main__':
    main()

使用较低级别

os.pipe
创建的管道不会像
multiprocessing.Pipe
中的管道那样添加任何额外内容。因此,我们可以将
os.pipe
loop.connect_read_pipe()
一起使用,而无需重新实现任何类型的内部工作。这是一个例子:

import asyncio
import multiprocessing
import os


def main():
    read, write = os.pipe()
    writer_process = multiprocessing.Process(target=writer, args=(write,))
    writer_process.start()
    asyncio.get_event_loop().run_until_complete(reader(read))


async def reader(read):
    pipe = os.fdopen(read, mode='r')

    loop = asyncio.get_event_loop()
    stream_reader = asyncio.StreamReader()
    def protocol_factory():
        return asyncio.StreamReaderProtocol(stream_reader)

    transport, _ = await loop.connect_read_pipe(protocol_factory, pipe)
    print(await stream_reader.readline())
    transport.close()


def writer(write):
    os.write(write, b'Hello World\n')


if __name__ == '__main__':
    main()

这段代码帮助我弄清楚了如何使用

loop.connect_read_pipe


6
投票

aiopipe
似乎做你想做的事!它可以与内置的
multiprocessing
模块一起使用,并提供与常规阻塞管道类似的API。

© www.soinside.com 2019 - 2024. All rights reserved.