我是结构化流媒体新手,希望根据 json 消息中的日期列创建分区列。
这是示例消息:
{"date": "2022-03-01", "code": "1000310014", "no": "20191362283860", "update_type_cd": "UP", "EventProcessedUtcTime": null, "PartitionId": null, "EventEnqueuedUtcTime": null}
{"date": "2022-03-01", "code": "2000310014", "no": "300191362283860", "update_type_cd": "UP", "EventProcessedUtcTime": null, "PartitionId": null, "EventEnqueuedUtcTime": null}
{"date": "2022-03-01", "code": "30002220014", "no": "20191333383860", "update_type_cd": "UP", "EventProcessedUtcTime": null, "PartitionId": null, "EventEnqueuedUtcTime": null}
val date = event.select(col("date"))
val stream = flatten_messages
.writeStream
.partitionBy(date)
.format("delta")
.outputMode("append")
.start(output_path)
这是 json 消息的正确分区方式吗?
不,在
partitionBy
中,您只需指定列名称,而不是数据框。所以代码就是:
val stream = flatten_messages
.writeStream
.partitionBy("date")
.format("delta")
.outputMode("append")
.start(output_path)
但是第一个问题是——你真的需要对数据进行分区吗? Delta 可能没有严格要求,它有数据跳过、ZOrder 等功能。
附注另外,您可能需要将
date
列转换为 date
类型 - 在这种情况下,它将更有效地存储在磁盘上,并且允许范围搜索等。尽管它与分区无关。