我正在使用以下函数(部分来自我从这篇文章中获得的代码片段:计算 Spark 数据帧的大小 - SizeEstimator 给出了意想不到的结果
并根据我的理解添加我的计算将在基于本教程的工作人员中分发:https://www.youtube.com/watch?v=hvF7tY2-L3U并最终得到以下功能:
def get_partitions(df):
partitions = 1
df.cache().foreach(lambda x: x)
df_size_in_bytes = spark._jsparkSession.sessionState()\
.executePlan(df._jdf.queryExecution().logical(),\
df._jdf.queryExecution().mode()).optimizedPlan()\
.stats()\
.sizeInBytes()
kilo_bytes = int(df_size_in_bytes/1024)
mega_bytes = int(kilo_bytes/1024)
parts = int(mega_bytes/128)
if parts <= 0:
parts = partitions
else:
partitions = parts
return partitions
虽然给了我以下错误:
Py4JError: An error occurred while calling o8705.mode. Trace:
py4j.Py4JException: Method mode([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
我正在使用 AWS Glue 作业和交互式会话,在胶水作业中我得到一个空跟踪,在 Glue 交互式会话中我得到上面的错误,计算中缺少什么。我目前正在使用 Spark 3.1 和 Glue 3.0。任何帮助将不胜感激!
Glue 3.0 使用的 Spark 版本不接受在 this 提交到 Spark 核心时引入的executePlan() 的“mode”参数。
因此,您只需从方法调用中删除参数即可:
def get_partitions(df):
partitions = 1
df.cache().foreach(lambda x: x)
df_size_in_bytes = spark._jsparkSession.sessionState() \
.executePlan(
df._jdf.queryExecution().logical()
).optimizedPlan() \
.stats() \
.sizeInBytes()
kilo_bytes = int(df_size_in_bytes/1024)
mega_bytes = int(kilo_bytes/1024)
parts = int(mega_bytes/128)
if parts <= 0:
parts = partitions
else:
partitions = parts
return partitions