使用pandas和dask将不同模式的parquet文件合并。

问题描述 投票:0回答:1

我有一个有大约1000个文件的parquet目录,而且模式是不同的。我想把所有这些文件合并到一个最佳的文件数量与文件重新分区。我使用 pandas 和 pyarrow 从目录中读取每个分区文件,并将所有的数据帧进行连接,然后将其写成一个文件。

用这种方法,当数据量增大时,我就会出现内存问题,被干掉。所以我选择了另一种方法来完成这个过程。

我先读取一堆文件,用concat合并,然后写到新的parquet目录。同样第二次,我读取第二堆文件,合并为一个数据框,并从第二个合并的数据框中取出一条记录。现在我有一个记录从第二个合并的数据框架,我再次从文件中读取第一个合并的数据框架,并将其与第二个合并的数据框架中的记录合并,然后我使用dask to_parquet,append功能将新文件添加到该parquet文件夹。

当我们从这个parquet中读取数据时,我将得到所有的列,就像parquet模式的演变? 它将类似于火花合并模式?

更新。

sample.parquet - contains 1000 part files

def read_files_from_path(inputPath):
   return {"inputPath": ["part-001","part-002",...,"part-100"]}


def mergeParquet(list_of_files,output_path)
   dfs_list = []
   for i in range:
      df = pd.read_parquet(i, engine='pyarrow')
      dfs_list.append(df)
   df = pd.concat(dfs_list,axis=0,sort=True)
   df_sample_record_df = df[2:3]

   if os.path.exists(output_path + '/_metadata'):
      files_in_output_path = getFiles(output_path)
      for f in files_in_output_path:
         temp_df = pd.read_parquet(f, engine='pyarrow')
         temp_combine_df = pd.concat(temp_df,df_sample_record_df) 
         temp_combine_df.repartition(partition_size="128MB") \
                .to_parquet(output_path+"/tmp",engine='pyarrow',
                            ignore_divisions=True,append=True)
         os.remove(output_path+"/"+each_file)
   return df

def final_write_parquet(df,output_path):
   if os.path.exists(output_path+"/tmp"):
      df.repartition(partition_size="128MB")\
              .to_parquet(output_path+str(self.temp_dir),engine='pyarrow',
                            ignore_divisions=True,append=True)
      files = os.listdir(output_path + "/tmp")
      for f in files:
         shutil.move(output_path+"/tmp"+"/"+f, output_path)
         shutil.rmtree(output_path+"/tmp")
   else:
      df.repartition(partition_size="128MB")\
                .to_parquet(output_path, engine='pyarrow', append=False)


if __name__ == "__main__":
   files_dict = read_files_from_path(inputPath)
   number_of_batches = 1000/500    # total files/batchsize
   for sub_file_names in np.array_split(files_dict[0], num_parts):
      paths = [os.path.join(root_dir, file_name) for file_name in sub_file_names]
      mergedDF = parquetMerge(paths)
      final_write_parquet(megedDF,outputPath)
python pandas dask parquet pyarrow
1个回答
0
投票

Dask数据框架假设所有分区都有相同的模式(列名和数据类型)。 如果你想混合具有几乎相同模式的不同数据集,那么你将需要手动处理这个问题。 Dask dataframe 今天没有提供自动支持。


0
投票

对于内存问题:使用'pyarrow table'代替'pandas dataframes'。

对于模式问题:你可以创建你自己的自定义 "pyarrow模式",并将每个pyarrow表与你的模式一起投递。

    import pyarrow as pa
    import pyarrow.parquet as pq
    def merge_small_parquet_files(small_files, result_file):
        pqwriter = None
        for small_file in small_files:
            table = pq.read_table(small_file)
            pyarrow_schema = get_pyarrow_schema()
            if not pqwriter:
                pqwriter = pq.ParquetWriter(result_file,
                                        schema=pyarrow_schema,
                                        compression='GZIP',
                                        coerce_timestamps='ms', allow_truncated_timestamps=True)
                table = table.cast(pyarrow_schema)
                pqwriter.write_table(table)
                table = None
                del table
            if pqwriter:
                pqwriter.close()

    def get_pyarrow_schema():
        fields = []
        fields.append(pa.field('first_name', pa.string()))
        fields.append(pa.field('last_name', pa.string()))
        fields.append(pa.field('Id', pa.float64()))
        fields.append(pa.field('Salary', pa.float64()))
        fields.append(pa.field('Time', pa.timestamp('ms')))
        pyarrow_schema = pa.schema(fields)
        return pyarrow_schema
    if __name__ == '__main__':
        small_files = ['file1.parquet', 'file2.parquet', 'file3.parquet', 'file4.parquet']
        result_file = 'large.parquet'
        merge_small_parquet_files(small_files, result_file)    
© www.soinside.com 2019 - 2024. All rights reserved.