我在使用 SEQUENCE 和 EXPLODE 处理 DataFrame 时遇到问题,该 dataframe 有 3 列: 员工ID 聘用日期 离开日期 我正在生成一个序列来获取每个员工的两个日期之间每月的记录,显然所有员工的 HireDate 和 LeftDate 都是不同的,因此生成的数组可能会更大,具体取决于员工,并且至少有 480k 员工记录(爆炸前)。
这是我一直在使用的代码:
df_exploded = df.select(
col("Employee_ID"),
explode(
sequence(
col("HireDate"),
when(col("LeftDate").isNotNull(), col("LeftDate")).otherwise(expr("current_date")),
expr("interval 1 month")
)
).alias("Month"))
当我执行
show()
或 display()
时,没有更大的问题,真正的问题来自其他类型的执行,例如 count()
或 write()
,我收到以下错误:
org.apache.spark.SparkException:作业因阶段失败而中止:阶段 8.0 中的任务 3 失败 4 次,最近一次失败:阶段 8.0 中丢失任务 3.3 (TID 113) (10.139.64.100 执行程序 0): java.lang .IllegalArgumentException:非法序列边界:1710115200000000 到 1709683200000000 by 12
我尝试使用 SQL 代码并减少每个数组中的元素数量,但生成了相同的结果:
df_explode_sql = spark.sql("""
WITH TABLE1 AS (SELECT
EMPLOYEE_ID,
HIREDATE,
CAST(
CASE
WHEN HIREDATE < '2023-01-01' THEN '2023-01-01'
ELSE HIREDATE
END AS DATE
) AS HIREDATE_ALT,
LEFTDATE
FROM TABLA
WHERE LEFTDATE > '2023-01-01')
SELECT *,
EXPLODE(sequence(HIREDATE_ALT, LEFT_DATE, interval 1 month))
FROM TABLE1
""")
还尝试过
persist()
和 cache()
有什么想法可以解决这个问题吗?我的数据存储在 ADLS2 的 Delta 表上。我正在使用 Databricks,我的集群有 4 个工作进程、256 GB RAM、32 个核心。
提前致谢。
您是否已经确认出院日期确实是< than the hiring date for all records? It's just that I find those integers so large that the java error looks strange.
1710115200000000 to 1709683200000000 by 12
1710115200000000 小于 1709683200000000 看来您想从未来的日期到过去的日期进行序列。