我有一个进程不断地写入并附加到 CSV。
我想要一个 Python 脚本尾部,该脚本尾部使用 Pandas 的 CSV 来聚合数据。我可能会聚合每 100 行,每 100 行获取聚合数据并将其发送到某个地方。
Pandas 有这方面的功能吗?有没有一种方法可以跟踪 Python 脚本正在处理的行号,这样如果我停止它或者它崩溃了,我可以再次启动它,它会从停止的地方继续?
如前所述,没有简单的内置方法可以做到这一点。不过,您可以将一个简单的 follow 函数(请参阅如何在 Python 中跟踪日志文件?)与 Pandas 结合起来聚合数据帧。
我们使用 follow 函数来尾部文件,将每个新行附加到一个列表中,然后一旦达到指定的行数,该列表就会转换为 pandas 数据帧。然后列表被重置,我们继续跟踪该文件。正如另一位评论者提到的,您可以将当前行号写入磁盘并读取同一文件以从上次中断的位置继续。下面是一个例子。
import pandas as pd
from io import StringIO
import time
import os
def follow(thefile):
with open(thefile, 'r') as mfile:
while True:
line = mfile.readline()
if not line or not line.endswith('\n'):
time.sleep(0.1)
continue
yield line
if __name__ == "__main__":
# set the file we want to log the current line to
log_file = "./current_line"
# check if the last line processed has been saved
if os.path.exists(log_file):
with open(log_file, 'r') as ifile:
# get the last line processed
start_line = int(ifile.read())
else:
# set the last line processed to be the first data row (not the header). If there is no header then set to 0
start_line = 1
# set the file we are reading
myfile = 'test.csv'
# remove this line if you don't need the header
header = pd.read_csv(myfile, nrows=0)
# initialize the list to store the lines in
lines = []
# loop through each line in the file
for nline, line in enumerate(follow(myfile)):
# if we have already processed this file
if nline < start_line:
continue
# append to the lines list
lines.append(line)
# check if the we have hit the number of lines we want to handle
if len(lines) == 100:
# read the csv from the lines we have processed
df = pd.read_csv(StringIO(''.join(lines)), header=None)
# update the header. Delete this row if there is no header
df.columns = header.columns
# do something with df
print(df)
# reset the lines list
lines = []
# open the log file and note the line we have processed up to
with open(log_file, 'w') as lfile:
lfile.write(str(nline)) # only write the processed lines when we have actually done something