我有一个有大约1000个文件的parquet目录,而且模式是不同的。我想把所有这些文件合并到一个最佳的文件数量与文件重新分区。我使用 pandas 和 pyarrow 从目录中读取每个分区文件,并将所有的数据帧进行连接,然后将其写成一个文件。
用这种方法,当数据量增大时,我就会出现内存问题,被干掉。所以我选择了另一种方法来完成这个过程。
我先读取一堆文件,用concat合并,然后写到新的parquet目录。同样第二次,我读取第二堆文件,合并为一个数据框,并从第二个合并的数据框中取出一条记录。现在我有一个记录从第二个合并的数据框架,我再次从文件中读取第一个合并的数据框架,并将其与第二个合并的数据框架中的记录合并,然后我使用dask to_parquet,append功能将新文件添加到该parquet文件夹。
当我们从这个parquet中读取数据时,我将得到所有的列,就像parquet模式的演变? 它将类似于火花合并模式?
更新。
sample.parquet - contains 1000 part files
def read_files_from_path(inputPath):
return {"inputPath": ["part-001","part-002",...,"part-100"]}
def mergeParquet(list_of_files,output_path)
dfs_list = []
for i in range:
df = pd.read_parquet(i, engine='pyarrow')
dfs_list.append(df)
df = pd.concat(dfs_list,axis=0,sort=True)
df_sample_record_df = df[2:3]
if os.path.exists(output_path + '/_metadata'):
files_in_output_path = getFiles(output_path)
for f in files_in_output_path:
temp_df = pd.read_parquet(f, engine='pyarrow')
temp_combine_df = pd.concat(temp_df,df_sample_record_df)
temp_combine_df.repartition(partition_size="128MB") \
.to_parquet(output_path+"/tmp",engine='pyarrow',
ignore_divisions=True,append=True)
os.remove(output_path+"/"+each_file)
return df
def final_write_parquet(df,output_path):
if os.path.exists(output_path+"/tmp"):
df.repartition(partition_size="128MB")\
.to_parquet(output_path+str(self.temp_dir),engine='pyarrow',
ignore_divisions=True,append=True)
files = os.listdir(output_path + "/tmp")
for f in files:
shutil.move(output_path+"/tmp"+"/"+f, output_path)
shutil.rmtree(output_path+"/tmp")
else:
df.repartition(partition_size="128MB")\
.to_parquet(output_path, engine='pyarrow', append=False)
if __name__ == "__main__":
files_dict = read_files_from_path(inputPath)
number_of_batches = 1000/500 # total files/batchsize
for sub_file_names in np.array_split(files_dict[0], num_parts):
paths = [os.path.join(root_dir, file_name) for file_name in sub_file_names]
mergedDF = parquetMerge(paths)
final_write_parquet(megedDF,outputPath)
Dask数据框架假设所有分区都有相同的模式(列名和数据类型)。 如果你想混合具有几乎相同模式的不同数据集,那么你将需要手动处理这个问题。 Dask dataframe 今天没有提供自动支持。
对于内存问题:使用'pyarrow table'代替'pandas dataframes'。
对于模式问题:你可以创建你自己的自定义 "pyarrow模式",并将每个pyarrow表与你的模式一起投递。
import pyarrow as pa
import pyarrow.parquet as pq
def merge_small_parquet_files(small_files, result_file):
pqwriter = None
for small_file in small_files:
table = pq.read_table(small_file)
pyarrow_schema = get_pyarrow_schema()
if not pqwriter:
pqwriter = pq.ParquetWriter(result_file,
schema=pyarrow_schema,
compression='GZIP',
coerce_timestamps='ms', allow_truncated_timestamps=True)
table = table.cast(pyarrow_schema)
pqwriter.write_table(table)
table = None
del table
if pqwriter:
pqwriter.close()
def get_pyarrow_schema():
fields = []
fields.append(pa.field('first_name', pa.string()))
fields.append(pa.field('last_name', pa.string()))
fields.append(pa.field('Id', pa.float64()))
fields.append(pa.field('Salary', pa.float64()))
fields.append(pa.field('Time', pa.timestamp('ms')))
pyarrow_schema = pa.schema(fields)
return pyarrow_schema
if __name__ == '__main__':
small_files = ['file1.parquet', 'file2.parquet', 'file3.parquet', 'file4.parquet']
result_file = 'large.parquet'
merge_small_parquet_files(small_files, result_file)