PySpark:如何读回写入 S3 的分桶表?

问题描述 投票:0回答:1

我正在尝试使用 PySpark 中的存储桶,使用以下一般示例: https://gist.github.com/luminousmen/8dffa01a02bb58946b1299a621e44897

https://towardsdatascience.com/best-practices-for-bucketing-in-spark-sql-ea9f23f7dd53

我将分桶表写入 S3,如下所示:

spark = SparkSession.builder.appName("bucketing test").enableHiveSupport().config(
    "spark.sql.sources.bucketing.enabled", "true").getOrCreate()
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

# Create the DF
df = spark.range(1, 16000, 1, 16).select(
    F.col("id").alias("key"), F.rand(12).alias("value")
)
# Write the DF to disk
df.write.bucketBy(8, "key").sortBy("value").saveAsTable(
    "bucketed_table", format="parquet", mode="overwrite", path="s3a://my/s3/path"
)

然后,我尝试读回它并测试分桶连接。

df_bucketed = spark.sql("""
       CREATE EXTERNAL TABLE bucketed_table (
           key int,
           value float
       ) STORED AS PARQUET
       CLUSTERED BY (key) SORTED BY (value) INTO 8 BUCKETS
       LOCATION 's3a://my/s3/path'
    """
)
# Print the extended describe and confirm bucketing works
logging.info("Extended Describe: %s" % spark.sql("DESCRIBE EXTENDED bucketed_table")._jdf.showString(100, 40, False))
logging.info("Bucketed table columns: %s" % df_bucketed.columns)

这导致输出:

为什么读取的 DF 没有列?有没有更好的方法从 S3 读取 PySpark 中的分桶 DF?

请注意,spark.table("") 对我不起作用,因为我需要指定 S3 路径(并且我不知道如何在 table() 调用中执行此操作)。

apache-spark amazon-s3 pyspark hive apache-spark-sql
1个回答
0
投票

df_bucketed = spark.sql("CREATE TABLE ...")
会给你这个空的 DataFrame 结果,正如你所看到的。这是因为创建语句成功时会返回一个空的 DataFrame。该结果似乎还包含一些创建的表的元数据。修复非常简单,使用
spark.table("bucketed_table")
,它将读取分桶表。

© www.soinside.com 2019 - 2024. All rights reserved.