如何在涉及写入文件或.gzip文件的复杂生产者-消费者流中正确使用asyncio

问题描述 投票:1回答:1

我正在实现一个python模块,该模块接收包含三个列表(xyval)的元组,并根据特定比例对它们进行二次采样。我做对了吗?

  1. 我是否异步写入磁盘?
  2. 我是否可以有[[many个生产者和消费者,以便他们所有人都可以生成数据并将数据写入同一输出文件?
  3. 当我将此代码与单个线程的幼稚实现进行比较时,它们在运行时的性能相似。

  • import bisect import numpy as np import gzip import asyncio class SignalPDF: def __init__(self, inputSignal): self.x = inputSignal[0][:] self.y = inputSignal[1][:] self.vals = inputSignal[2][:] self.valCumsum = np.cumsum(self.vals) self.totalSum = np.sum(self.vals) self.N = len(self.vals) class SignalSampler: def __init__(self, inputSignal, ratio=1.0): self.signalPDF = SignalPDF(inputSignal) self.Q = asyncio.Queue() self.ratio = float(ratio) self.N = int(self.signalPDF.N/self.ratio) self.sampledN = 0 async def randRead(self): while self.sampledN < self.N: i = np.random.randint(self.signalPDF.totalSum, size=1, dtype=np.uint64)[0] self.sampledN += 1 cell = bisect.bisect(self.signalPDF.valCumsum, i) yield (self.signalPDF.x[cell], self.signalPDF.y[cell], int(self.signalPDF.vals[cell])) async def readShortFormattedLine(self): async for read in self.randRead(): x = read[0]; y = read[1]; val = read[2]; yield '{0} {1} {2}'.format(x,y,val) async def populateQueue(self): async for i in self.readShortFormattedLine(): await self.Q.put(i) await self.Q.put(None) async def hanldeGzip(self, filePath): with gzip.open(filePath, 'wt') as f: while True: item = await self.Q.get() if item is None: break f.write('{0}\n'.format(item)) f.flush() async def hanldeFile(self, filePath): with open(filePath, 'w+') as f: while True: item = await self.Q.get() if item is None: break f.write('{0}\n'.format(item)) f.flush() def main(gzip, outputFile): x=[]; y=[];val=[] for i in range(100): for j in range(100): x.append(i) y.append(j) val.append(np.random.randint(0,250)) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) mixer = SignalSampler(inputSignal=[x,y,val], ratio=2.0) futures = [] if gzip: futures = [mixer.hanldeGzip(outputFile), mixer.populateQueue()] else: futures = [mixer.hanldeFile(outputFile), mixer.populateQueue()] tasks = asyncio.wait(futures, loop=loop) results = loop.run_until_complete(tasks) loop.close() main(gzip=False, outputFile='/tmp/a.txt') main(gzip=True, outputFile='/tmp/a.txt.gz')
  • python file io python-asyncio python-3.8
    1个回答
    1
    投票
    asyncio的工作方式

    让我们考虑发出两个Web请求的任务。

    同步版本:

      发送请求1
    1. 等待1秒钟的答案。
    2. 发送请求2
    3. 等待1秒钟的答案。
    4. 两个请求都在[[2秒]]中完成
    5. 异步版本:

    发送请求1

      而不是等待,立即发送请求2
    1. 等待答案超过1秒钟。
    2. 两个请求都在[[1秒]]中完成
  • asyncio允许您编写实际上在第二个异步版本中工作的程序,而您的代码看起来与(直观的)第一个版本非常相似。
  • [注意此处的重要内容:异步版本更快的唯一原因是它立即启动了另一个并发操作,而不是等待第一个完全完成。它与线程无关,asyncio在单个主线程中工作。

    磁盘I / O呢?

    您的硬件可以并行读写两个文件吗?
    如果您只有一个物理硬盘,则可能没有:它只有一个物理"needle",它可以同时读取/写入单个数据。异步方法对您没有帮助。

    如果您有multiple disks,情况可能会有所不同。尽管我知道OS / asyncio是否可以并行处理多个磁盘(probably not)。

    假定您希望硬件和操作系统支持多个磁盘I / O。仅当您使用多个线程或进程进行操作时,它才可能起作用:

    模块aiofiles使用线程来处理文件-您可以尝试一下

    要使用ProcessPoolExecutorasyncio处理流程,可以使用run_in_executor as shown here

    • 也有可能纯粹由于并行化与CPU绑定的相关操作而使用进程或什至线程会增加磁盘I / O,但是我不知道是否是这种情况以及它的好处(与磁盘相比可能不多I / O)。
    © www.soinside.com 2019 - 2024. All rights reserved.