所以我们的雪花使用 TIMESTAMP_LTZ,时间戳是 America/New_york,没有偏移量信息。我正在尝试使用 pyspark 从雪花中提取数据。
有两种方法可以做到这一点:
spark.conf.set("spark.sql.session.timeZone", "America/New_York")
start = '2023-05-01 14:00:00'
end = '2023-05-01 15:00:00'
sql_query = \
"""
select id, created_at::timestamp_ltz from db.table where created_at > '{start}'and created_at < '{end}' limit 1
"""
df = (spark.read
.format("snowflake")
.option("query", sql_query.format(start=start, end=end))
.option("sfUrl", host_url)
.option("sfUser", username)
.option("sfPassword", password)
.option("sfDatabase", database_name)
.option("sfWarehouse", warehouse_name)
.option('partition_size_in_mb', 80)
.option('use_cached_result', 'true')
.option('use_copy_unload', 'false')
.option('parallelism', '40')
.option("sfCompress","on")
.option("sfTimezone", "America/New_York")
.load()
)
这将为我提供具有正确时间戳的正确数据
spark.conf.set("spark.sql.session.timeZone", "America/New_York")
start = '2023-05-01 14:00:00'
end = '2023-05-01 15:00:00'
df = (spark.read
.format("snowflake")
.option("sfTimezone", "America/New_York")
.option("dbtable", table_name)
.option("sfUrl", host_url)
.option("sfUser", username)
.option("sfPassword", password)
.option("sfDatabase", database_name)
.option("sfSchema", schema_name)
.option("sfWarehouse", warehouse_name)
.load()
)
df = df.selectExpr("ID", "CREATED_AT::TIMESTAMP_LTZ as CREATED_AT").limit(1)
df = df.filter(df.CREATED_AT.between(start,end))
如果我不将时间戳转换为“TIMESTAMP_LTZ”,它会将时间戳视为 UTC 并在我使用 toPandas() 函数或保存为增量表时将其移动,从而导致时间戳不正确。
有些人可能还建议将 spark session 和 snowflake session 都设置为 UTC。这也获取了正确的数据,但是当我保存到增量表时它添加了一个“+0000”后缀,这也使时间戳无效。如果我可以保存到没有时区的增量表,我想它也可以工作,但我不知道该怎么做。