从结构流式json数据创建分区列

问题描述 投票:0回答:1

我是结构化流媒体新手,希望根据 json 消息中的日期列创建分区列。

这是示例消息:

{"date": "2022-03-01", "code": "1000310014", "no": "20191362283860", "update_type_cd": "UP", "EventProcessedUtcTime": null, "PartitionId": null, "EventEnqueuedUtcTime": null}

{"date": "2022-03-01", "code": "2000310014", "no": "300191362283860", "update_type_cd": "UP", "EventProcessedUtcTime": null, "PartitionId": null, "EventEnqueuedUtcTime": null}

{"date": "2022-03-01", "code": "30002220014", "no": "20191333383860", "update_type_cd": "UP", "EventProcessedUtcTime": null, "PartitionId": null, "EventEnqueuedUtcTime": null}

val date = event.select(col("date"))

val stream = flatten_messages
      .writeStream
      .partitionBy(date)
      .format("delta")
      .outputMode("append")
      .start(output_path)

这是 json 消息的正确分区方式吗?

databricks azure-databricks spark-structured-streaming delta-lake
1个回答
1
投票

不,在

partitionBy
中,您只需指定列名称,而不是数据框。所以代码就是:

val stream = flatten_messages
      .writeStream
      .partitionBy("date")
      .format("delta")
      .outputMode("append")
      .start(output_path)

但是第一个问题是——你真的需要对数据进行分区吗? Delta 可能没有严格要求,它有数据跳过、ZOrder 等功能。

附注另外,您可能需要将

date
列转换为
date
类型 - 在这种情况下,它将更有效地存储在磁盘上,并且允许范围搜索等。尽管它与分区无关。

© www.soinside.com 2019 - 2024. All rights reserved.