我有一些以 JSON 格式存储的数据,如下所示:
{
"id":1,
"time":"2023-01-01 12:34:56"
}
我还有一个具有相同列的 Apache Hudi 表。 hudi 表的模式是(由 pyarrow 读取):
id: int64
time: timestamp[us, tz=UTC]
我尝试使用Spark将JSON数据合并到Hudi表中:
with SparkSession.builder.getOrCreate() as spark:
df = spark.read.json('path/to/json/files')
df.createOrReplaceTempView('upsert_data')
spark.sql(f"CREATE TABLE snapshot USING hudi LOCATION 'path/to/hudi/table'")
sql_upsert='''MERGE INTO snapshot AS target
USING upsert_data AS source
ON source.id = target.id
WHEN MATCHED THEN UPDATE SET target.time=source.time
WHEN NOT MATCHED THEN INSERT (id, time) values (source.id, source.time)'''
spark.sql(sql_upsert)
但是数据合并后,Hudi表中的
time
值变成了56019-01-08 12:34:56
。我想也许这是因为 Spark 自动将 time
字符串更改为 timestamp[ns]
,这与 Hudi 表的 timestamp[us]
不同。我该如何解决这个问题?
我搜索了PySpark文档,但没有找到任何功能/设置可以直接实现这一点。最后我使用
cast(to_timestamp(time) as long)/1000
手动将时间单位从纳秒更改为微秒