我想创建一个文件生成器,它可以让我逐行读取一个文件(CSV),并在文件中添加新行时继续运行(就像一个连续的日志),但当日志中没有发现新行时,它也会继续在后台等待运行。
我试过用 aiofiles但我不知道如何从我的同步函数main函数中运行async函数。然后我尝试了 三重奏,使用下面的代码,我能够读取文件中的行。
async def open_file(filepath):
with open(filepath, newline='') as f:
first_line = True
_reader = csv.reader(f, lineterminator='\n')
for row in _reader:
if row:
# skip header row
if first_line:
first_line = False
else:
print(tuple(row)) # yield tuple(row) here gives the error stated below
else:
await trio.sleep(1)
trio.run(open_file, 'products.csv')
但是脚本在读完行之后就停止了,而且没有在后台等待更多的行。. 而当我换成 print(row)
与 yield tuple(row)
(实际上会返回一个生成器),我得到了错误信息 TypeError: start_soon expected an async function but got an async generator <async_generator object open_file at 0x1050944d0>
.
所以... print
行的工作是正常的,但 yield
没有
如何解决这个问题?另外,这样能不能帮助并行读行?更新: 请不要让我使用 csv.reader
读取行数,因为有些行数包含了以下内容 \n
而这是正确读取记录的唯一方法。
迭代器对你的情况没有帮助,因为当迭代器到达文件的末端时就会立即停止。
你也许可以在以下模块中查找类似的功能----。https:/github.comkasunpython-tailblobmastertail.py。
def follow(filename):
with open(filename) as file_:
file_.seek(0,2) # Remove it if you need to scan file from the beginning
while True:
curr_position = file_.tell()
line = file_.readline()
if not line:
file_.seek(curr_position)
yield None
else:
yield line
然后你可以创建一个生成器,如果行还没有准备好,则生成None,如果文件中有下一行可用,则生成字符串。使用 next()
函数,你可以以非阻塞的方式逐行获取。
下面是如何使用它。
non_blocking_reader = follow("my_file.txt")
# do something
line = next(non_blocking_reader)
if line is not None: # You need to distinguish None from empty string, so use `is not` not just `if line:`
# do something else
# do something else
next_line = next(non_blocking_reader)
# ...