我正在尝试使用 PySpark 中的存储桶,使用以下一般示例: https://gist.github.com/luminousmen/8dffa01a02bb58946b1299a621e44897
https://towardsdatascience.com/best-practices-for-bucketing-in-spark-sql-ea9f23f7dd53
我将分桶表写入 S3,如下所示:
spark = SparkSession.builder.appName("bucketing test").enableHiveSupport().config(
"spark.sql.sources.bucketing.enabled", "true").getOrCreate()
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
# Create the DF
df = spark.range(1, 16000, 1, 16).select(
F.col("id").alias("key"), F.rand(12).alias("value")
)
# Write the DF to disk
df.write.bucketBy(8, "key").sortBy("value").saveAsTable(
"bucketed_table", format="parquet", mode="overwrite", path="s3a://my/s3/path"
)
然后,我尝试读回它并测试分桶连接。
df_bucketed = spark.sql("""
CREATE EXTERNAL TABLE bucketed_table (
key int,
value float
) STORED AS PARQUET
CLUSTERED BY (key) SORTED BY (value) INTO 8 BUCKETS
LOCATION 's3a://my/s3/path'
"""
)
# Print the extended describe and confirm bucketing works
logging.info("Extended Describe: %s" % spark.sql("DESCRIBE EXTENDED bucketed_table")._jdf.showString(100, 40, False))
logging.info("Bucketed table columns: %s" % df_bucketed.columns)
这导致输出:
为什么读取的 DF 没有列?有没有更好的方法从 S3 读取 PySpark 中的分桶 DF?
请注意,spark.table("") 对我不起作用,因为我需要指定 S3 路径(并且我不知道如何在 table() 调用中执行此操作)。
df_bucketed = spark.sql("CREATE TABLE ...")
会给你这个空的 DataFrame 结果,正如你所看到的。这是因为创建语句成功时会返回一个空的 DataFrame。该结果似乎还包含一些创建的表的元数据。修复非常简单,使用 spark.table("bucketed_table")
,它将读取分桶表。