我有一个大小为 10GB 的巨大 csv 文件
data/history_{date_to_be_searched}.csv
该文件的内容比 27000
邮政编码还要多。根据邮政编码,我必须过滤 csv 文件,然后我必须将每个过滤后的文件上传到 azure blob 存储。
因此,我正在像这样
1000000
一样以 for chunk in pd.read_csv(f'data/history_{date_to_be_searched}.csv', chunksize=1000000):
大小的块获取数据,然后从块 uniquezipcode = df_final["zipcode"].unique()
中获取唯一的邮政编码,然后将其上传到 Blob 存储。
这样,chuck 循环了 25 次,数据过滤器循环了 27000
次,这花费了太多时间 70+ hours
所以我想使用 map()
而不是这样的循环。
def filtered_data():
try:
for chunk in pd.read_csv(f'data/history_{date_to_be_searched}.csv', chunksize=1000000):
df_final=chunk
uniquezipcode = df_final["zipcode"].unique()
grouped_data = df_final.groupby('zipcode')
del df_final
data_for_upload = [(zipcode, group_df) for zipcode, group_df in grouped_data]
map(upload_filtered_data,*zip(*data_for_upload))
del data_for_upload
except Exception as e:
print("filtered_data Error : ",e)
def upload_filtered_data(each_zone,df_filter):
global date_to_be_searched
try:
if df_filter.shape[0] != 0:
file_path = f"{path}/{container_name}/{date_to_be_searched.year}/{date_to_be_searched.month}/{each_zone}/{each_zone}{date_to_be_searched}.parquet"
outdir = f"{path}/{container_name}/{date_to_be_searched.year}/{date_to_be_searched.month}/{each_zone}/"
outdir_blob = f"{date_to_be_searched.year}/{date_to_be_searched.month}/{each_zone}/{each_zone}{date_to_be_searched}.parquet"
fullname = os.path.join(outdir, f"{each_zone}{date_to_be_searched}.parquet")
if not os.path.exists(file_path):
os.makedirs(file_path)
else:
df_filter = merge_df(df_filter, file_path)
df_filter.to_parquet(fullname, allow_truncated_timestamps=True)
upload_to_blob(fullname, outdir_blob)
del each_battery
del df_filter
except Exception as e:
print("upload_filtered_data Error : ",e)
我认为这种方式会快得多,但它需要 RAM,因此每次运行此脚本都会被终止/杀死。请建议任何其他有效的方法或者我应该减少块大小?有没有办法在单个循环中过滤整个 csv 文件而不加载到 RAM 中?
我曾经遇到过类似的问题。就我而言,这是由于清单很大。
迭代
grouped_data
可能会产生许多 pd.DataFrames
这也可能非常昂贵。
您是否尝试过使用
dict
或 OrderedDict
代替列表或 DataFrame?
字典对于许多项目来说要快得多,并且创建速度比 DataFrame 快得多。
还有像tracemalloc这样的工具来定位内存泄漏