我需要在 Python 中的进程之间进行通信,并且在每个进程中使用
asyncio
来实现并发网络 IO。
目前我在进程之间使用
multiprocessing.Pipe
到send
和recv
显着大量的数据,但是我在asyncio
之外这样做,我相信我在IO_WAIT
中花费了大量的CPU时间
因为它。
看起来
asyncio
可以而且应该用于处理进程之间的管道IO,但是除了管道STDIN/STDOUT之外我找不到任何示例。
从我读到的内容看来,我应该用
loop.connect_read_pipe(PROTOCOL_FACTORY, PIPE)
注册管道,同样用于写入。但是我不明白 protocol_factory
的目的,因为它与 multiprocessing.Pipe
有关。甚至不清楚我是否应该创建一个 multiprocessing.Pipe
或者是否可以在 asyncio
内创建一个管道。
multiprocessing.Pipe
使用高级 multiprocessing.Connection
模块来pickle 和unpickle Python 对象并在底层传输额外的字节。如果您想使用 loop.connect_read_pipe()
从这些管道之一读取数据,您必须自己重新实现所有这些。
在不阻塞事件循环的情况下从
multiprocessing.Pipe
读取数据的最简单方法是使用 loop.add_reader()
。考虑以下示例:
import asyncio
import multiprocessing
def main():
read, write = multiprocessing.Pipe(duplex=False)
writer_process = multiprocessing.Process(target=writer, args=(write,))
writer_process.start()
asyncio.get_event_loop().run_until_complete(reader(read))
async def reader(read):
data_available = asyncio.Event()
asyncio.get_event_loop().add_reader(read.fileno(), data_available.set)
while not read.poll():
await data_available.wait()
data_available.clear()
print(read.recv())
def writer(write):
write.send('Hello World')
if __name__ == '__main__':
main()
使用较低级别
os.pipe
创建的管道不会像 multiprocessing.Pipe
中的管道那样添加任何额外内容。因此,我们可以将 os.pipe
与 loop.connect_read_pipe()
一起使用,而无需重新实现任何类型的内部工作。这是一个例子:
import asyncio
import multiprocessing
import os
def main():
read, write = os.pipe()
writer_process = multiprocessing.Process(target=writer, args=(write,))
writer_process.start()
asyncio.get_event_loop().run_until_complete(reader(read))
async def reader(read):
pipe = os.fdopen(read, mode='r')
loop = asyncio.get_event_loop()
stream_reader = asyncio.StreamReader()
def protocol_factory():
return asyncio.StreamReaderProtocol(stream_reader)
transport, _ = await loop.connect_read_pipe(protocol_factory, pipe)
print(await stream_reader.readline())
transport.close()
def writer(write):
os.write(write, b'Hello World\n')
if __name__ == '__main__':
main()
这段代码帮助我弄清楚了如何使用
loop.connect_read_pipe
。