使用 Pandas 处理 CSV 的实时尾部

问题描述 投票:0回答:1

我有一个进程不断地写入并附加到 CSV。

我想要一个 Python 脚本尾部,该脚本尾部使用 Pandas 的 CSV 来聚合数据。我可能会聚合每 100 行,每 100 行获取聚合数据并将其发送到某个地方。

Pandas 有这方面的功能吗?有没有一种方法可以跟踪 Python 脚本正在处理的行号,这样如果我停止它或者它崩溃了,我可以再次启动它,它会从停止的地方继续?

python pandas
1个回答
1
投票

如前所述,没有简单的内置方法可以做到这一点。不过,您可以将一个简单的 follow 函数(请参阅如何在 Python 中跟踪日志文件?)与 Pandas 结合起来聚合数据帧。

我们使用 follow 函数来尾部文件,将每个新行附加到一个列表中,然后一旦达到指定的行数,该列表就会转换为 pandas 数据帧。然后列表被重置,我们继续跟踪该文件。正如另一位评论者提到的,您可以将当前行号写入磁盘并读取同一文件以从上次中断的位置继续。下面是一个例子。

import pandas as pd
from io import StringIO
import time
import os

def follow(thefile):
    with open(thefile, 'r') as mfile:
        while True:
           line = mfile.readline()
           if not line or not line.endswith('\n'):
               time.sleep(0.1)
               continue
           yield line

if __name__ == "__main__":
    # set the file we want to log the current line to
    log_file = "./current_line"

    # check if the last line processed has been saved
    if os.path.exists(log_file):
        with open(log_file, 'r') as ifile:
            # get the last line processed
            start_line = int(ifile.read())
    else:
        # set the last line processed to be the first data row (not the header).  If there is no header then set to 0
        start_line = 1

    # set the file we are reading
    myfile = 'test.csv'

    # remove this line if you don't need the header
    header = pd.read_csv(myfile, nrows=0)

    # initialize the list to store the lines in
    lines = []

    # loop through each line in the file
    for nline, line in enumerate(follow(myfile)):
        # if we have already processed this file
        if nline < start_line:
            continue
        # append to the lines list
        lines.append(line)

        # check if the we have hit the number of lines we want to handle
        if len(lines) == 100:
            # read the csv from the lines we have processed
            df = pd.read_csv(StringIO(''.join(lines)), header=None)
            # update the header.  Delete this row if there is no header
            df.columns = header.columns

            # do something with df
            print(df)

            # reset the lines list
            lines = []  

            # open the log file and note the line we have processed up to
            with open(log_file, 'w') as lfile:
                lfile.write(str(nline))  # only write the processed lines when we have actually done something
© www.soinside.com 2019 - 2024. All rights reserved.