逐行读取csv文件并保存满足一定条件的行

问题描述 投票:0回答:1

我有一个已经在多个主题中讨论过的问题,但我想更深入一点,也许找到更好的解决方案。

因此,我们的想法是使用 python 遍历“巨大”(50 到 60GB)的 .csv 文件,找到满足某些条件的行,提取它们,最后将它们存储在第二个变量中以供进一步分析。

最初的问题是 R 脚本,我使用 Sparklyr 连接进行管理,或者最终使用 bash 中的一些 gawk 代码(参见 awk 或 gawk)来提取我需要的数据,然后使用 R/python 进行分析。

我想专门用 python 来解决这个问题,这个想法是避免混合像 bash/python 或 bash/R (unix) 这样的语言。到目前为止,我使用 open 作为 x,并逐行浏览文件,它有点工作,但速度非常慢。例如,浏览文件非常快(每秒约 500.000 行,即使 58M 行也可以),但是当我尝试存储数据时,速度下降到每秒约 10 行。对于大约 300.000 行的提取,这是不可接受的。

我尝试了几种解决方案,我猜这不是最佳的(糟糕的Python代码?:(),最终存在更好的解决方案。

解决方案1:遍历文件,将行拆分为列表,检查条件,如果可以,则将行放入numpy矩阵和vstack中,用于满足条件的每次迭代(非常慢)

import csv
import numpy
import pandas
from tqdm import tqdm

date_first = '2008-11-01'
date_last = '2008-11-10'

a = numpy.array(['colnames']*35) #data is 35 columns
index = list()

with open("data.csv", "r") as f:
    for line in tqdm(f, unit = " lines per"):
        line = line.split(sep = ";") # csv with ";" ...
        date_file = line[1][0:10] # date stored in the 2nd column 

        if  date_file >= date_first and date_file <= date_last : #data extraction concern a time period (one month for example)
            line=numpy.array(line) #go to numpy
            a=numpy.vstack((a, line)) #stack it

解决方案2:相同,但如果条件正常(非常慢),则将行存储在带有行索引的pandas data.frame中

import csv
import numpy
import pandas
from tqdm import tqdm

date_first = '2008-11-01'
date_last = '2008-11-10'
row = 0 #row index
a = pandas.DataFrame(numpy.zeros((0,35)))#data is 35 columns

with open("data.csv", "r") as f:
    for line in tqdm(f, unit = " lines per"):
        line = line.split(sep = ";")
        date_file = line[1][0:10]

        if  date_file>=date_first and date_file<=date_last :
            a.loc[row] = line #store the line in the pd.data.frame at the position row
            row = row + 1 #go to next row

解决方案3:相同,但不是将行存储在某处(这对我来说是主要问题),而是保留满足行的索引,然后使用我需要的行打开csv(甚至更慢,实际上经过文件查找索引的速度足够快,打开索引的行非常慢)

import csv
import numpy
import pandas
from tqdm import tqdm

date_first = '2008-11-01'
date_last = '2008-11-10'
row = 0
index = list()

with open("data.csv", "r") as f:
    f = csv.reader(f, delimiter = ";")
    for line in tqdm(f, unit = " lines per"):
        line = line.split(sep = ";")
        date_file = line[1][0:10]
        row = row + 1
        if  date_file>=date_first and date_file<=date_last :
            index.append(row)

with open("data.csv") as f:
    reader=csv.reader(f)
    interestingrows=[row for idx, row in enumerate(reader) if idx in index]

这个想法是只保留满足条件的数据,这里是特定月份的提取。我不明白问题出在哪里,将数据保存在某个地方(vstack,或在 pd.DF 中写入)绝对是一个问题。我很确定我做错了什么,但我不确定在哪里/什么。

数据是具有 35 列和超过 57M 行的 csv。 感谢您的阅读

哦。

python file-read
1个回答
7
投票

追加到数据帧和 numpy 数组非常昂贵,因为每个追加都必须将整个数据复制到新的内存位置。相反,您可以尝试分块读取文件,处理数据,然后追加回来。在这里,我选择了 100,000 的块大小,但您显然可以更改它。

我不知道您的 CSV 的列名称,所以我猜测是

'date_file'
。这应该会让你接近:

import pandas as pd

date_first = '2008-11-01'
date_last = '2008-11-10'

df = pd.read_csv("data.csv", chunksize=100000)

for chunk in df:
    chunk = chunk[(chunk['date_file'].str[:10] >= date_first)
                  & (chunk['date_file'].str[:10] <= date_last)]
    chunk.to_csv('output.csv', mode='a')

2024 年更新:自从我回答这个问题以来,事情发生了很大变化。目前的方法是使用

polars
来延迟加载数据。您可能希望使用
scan_csv
延迟加载符合您条件的数据,然后使用
sink_csv
进行输出。

所以,类似:

import polars as pl

df = (
    pl.scan_csv("data.csv")
      .filter(
        pl.col("date_col").str.slice(0, length=10) >= date_first, 
        pl.col("date_col").str.slice(0, length=10) <= date_last
    )
    .sink_csv("output.csv")
)

这将自动为您批量加载数据并将其流回新文件。请注意,现在,

parquet
文件格式对于处理数据来说更加紧凑和高效,因此可能值得将其流回该格式 - 不过,它不是人类可读的。

© www.soinside.com 2019 - 2024. All rights reserved.