我正在实现一个python模块,该模块接收包含三个列表(x
,y
,val
)的元组,并根据特定比例对它们进行二次采样。我做对了吗?
import bisect
import numpy as np
import gzip
import asyncio
class SignalPDF:
def __init__(self, inputSignal):
self.x = inputSignal[0][:]
self.y = inputSignal[1][:]
self.vals = inputSignal[2][:]
self.valCumsum = np.cumsum(self.vals)
self.totalSum = np.sum(self.vals)
self.N = len(self.vals)
class SignalSampler:
def __init__(self, inputSignal, ratio=1.0):
self.signalPDF = SignalPDF(inputSignal)
self.Q = asyncio.Queue()
self.ratio = float(ratio)
self.N = int(self.signalPDF.N/self.ratio)
self.sampledN = 0
async def randRead(self):
while self.sampledN < self.N:
i = np.random.randint(self.signalPDF.totalSum, size=1, dtype=np.uint64)[0]
self.sampledN += 1
cell = bisect.bisect(self.signalPDF.valCumsum, i)
yield (self.signalPDF.x[cell], self.signalPDF.y[cell], int(self.signalPDF.vals[cell]))
async def readShortFormattedLine(self):
async for read in self.randRead():
x = read[0]; y = read[1]; val = read[2];
yield '{0} {1} {2}'.format(x,y,val)
async def populateQueue(self):
async for i in self.readShortFormattedLine():
await self.Q.put(i)
await self.Q.put(None)
async def hanldeGzip(self, filePath):
with gzip.open(filePath, 'wt') as f:
while True:
item = await self.Q.get()
if item is None:
break
f.write('{0}\n'.format(item))
f.flush()
async def hanldeFile(self, filePath):
with open(filePath, 'w+') as f:
while True:
item = await self.Q.get()
if item is None:
break
f.write('{0}\n'.format(item))
f.flush()
def main(gzip, outputFile):
x=[]; y=[];val=[]
for i in range(100):
for j in range(100):
x.append(i)
y.append(j)
val.append(np.random.randint(0,250))
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
mixer = SignalSampler(inputSignal=[x,y,val], ratio=2.0)
futures = []
if gzip:
futures = [mixer.hanldeGzip(outputFile), mixer.populateQueue()]
else:
futures = [mixer.hanldeFile(outputFile), mixer.populateQueue()]
tasks = asyncio.wait(futures, loop=loop)
results = loop.run_until_complete(tasks)
loop.close()
main(gzip=False, outputFile='/tmp/a.txt')
main(gzip=True, outputFile='/tmp/a.txt.gz')
asyncio
的工作方式同步版本:
发送请求1
asyncio
允许您编写实际上在第二个异步版本中工作的程序,而您的代码看起来与(直观的)第一个版本非常相似。asyncio
在单个主线程中工作。磁盘I / O呢?
您的硬件可以并行读写两个文件吗?asyncio
是否可以并行处理多个磁盘(probably not)。假定您希望硬件和操作系统支持多个磁盘I / O。仅当您使用多个线程或进程进行操作时,它才可能起作用:
模块aiofiles使用线程来处理文件-您可以尝试一下
要使用ProcessPoolExecutor
和asyncio
处理流程,可以使用run_in_executor
as shown here