使用pyarrow引擎进行dask read_parquet

问题描述 投票:0回答:1

我有一个熊猫数据帧。我使用spark将它保存到镶木地板然后尝试通过dask读取。问题是没有使用pyarrow引擎回读分区列。

df = pd.DataFrame({'i64': np.arange(1000, dtype=np.int64),
                            'Ii32': np.arange(1000, dtype=np.int32),
                            'f': np.arange(1000, dtype=np.float64),
                            't': [datetime.datetime.now()] * 1000,
                            'e': ['1'] * 998 + [None,'1'],
                            'g' : [np.NAN] * 998 + [None, ''],
                            'bhello': np.random.choice(['hello', 'Yo', 'people', '1'], size=1000).astype("O")})

spark = SparkSession \
            .builder \
            .appName("Python Spark arrow compatibility") \
            .config("spark.some.config.option", "some-value") \
            .getOrCreate()
        spark.conf.set("spark.sql.execution.arrow.enabled", "true")
        #enable metadata write from spark
        spark.conf.set("parquet.enable.summary-metadata",  "true")
        #convert pandas df to spark df
        sparkDf = spark.createDataFrame(df)

        #write to parquet
        sparkDf.write.parquet(path, partitionBy=['bhello'])

        #use dask to read the above saved parquet with pyarrow engine
        df2 = dd.read_parquet('hdfs://127.0.0.1:8020/tmp/test/outputParquet10',
                              engine='pyarrow',
                             )

        print(df2.columns)
        self.assertIn('bhello', df2.columns)

我在这里做错了什么想法

python dask
1个回答
1
投票

我将假设这是一个最小的工作示例。因此,我的解决方案是使用dask读取它,然后使用fastparquetpyarrow引擎转换它。

代码如下。

import dask.dataframe as dd
ddf=dd.read_csv('/destination/of/your/file/file.format_name')
ddf.to_parquet('/destination/of/your/file/file.parquet',engine = 'fastparquet') #default is fastparquet if both engines are installed.

希望这可以帮助。

谢谢

迈克尔

© www.soinside.com 2019 - 2024. All rights reserved.