通过计算Dataframe大小来优化pyspark代码

问题描述 投票:0回答:1

我正在使用以下函数(部分来自我从这篇文章中获得的代码片段:计算 Spark 数据帧的大小 - SizeEstimator 给出了意想不到的结果

并根据我的理解添加我的计算将在基于本教程的工作人员中分发:https://www.youtube.com/watch?v=hvF7tY2-L3U并最终得到以下功能:

def get_partitions(df):
    partitions = 1
    df.cache().foreach(lambda x: x)                                                   
    df_size_in_bytes = spark._jsparkSession.sessionState()\
             .executePlan(df._jdf.queryExecution().logical(),\
             df._jdf.queryExecution().mode()).optimizedPlan()\
             .stats()\
             .sizeInBytes()
    kilo_bytes = int(df_size_in_bytes/1024)
    mega_bytes = int(kilo_bytes/1024)
    parts = int(mega_bytes/128)        
    if parts <= 0:
        parts = partitions
    else:
        partitions = parts   
    return partitions 

虽然给了我以下错误:

Py4JError: An error occurred while calling o8705.mode. Trace:
py4j.Py4JException: Method mode([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)

我正在使用 AWS Glue 作业和交互式会话,在胶水作业中我得到一个空跟踪,在 Glue 交互式会话中我得到上面的错误,计算中缺少什么。我目前正在使用 Spark 3.1 和 Glue 3.0。任何帮助将不胜感激!

amazon-web-services pyspark optimization aws-glue
1个回答
0
投票

Glue 3.0 使用的 Spark 版本不接受在 this 提交到 Spark 核心时引入的executePlan() 的“mode”参数。

因此,您只需从方法调用中删除参数即可:

def get_partitions(df):
    partitions = 1
    df.cache().foreach(lambda x: x)                                                   
    df_size_in_bytes = spark._jsparkSession.sessionState() \         
        .executePlan(
            df._jdf.queryExecution().logical()
        ).optimizedPlan() \
        .stats() \
        .sizeInBytes()
    kilo_bytes = int(df_size_in_bytes/1024)
    mega_bytes = int(kilo_bytes/1024)
    parts = int(mega_bytes/128)        
    if parts <= 0:
        parts = partitions
    else:
        partitions = parts   
    return partitions
© www.soinside.com 2019 - 2024. All rights reserved.