多次行动会引发失败

问题描述 投票:0回答:1

我是 Spark 新手。
我在将 df 保存到 Hive 表的部分遇到了一些问题。

def insert_into_hive_table(df: DataFrame, table_name: str):
        # for debugging - this action is working and contain 1528 rows
        logger.info(f'check if df in insert_into_hive_table can do action before saving')
        df = df.persist()
        logger.info(f'df count is {df.count()} rows')

        with spark_conf_context("hive.exec.dynamic.partition.mode", "nonstrict"), spark_conf_context("spark.sql.sources.partitionOverwriteMode", "dynamic"), spark_conf_context("spark.sql.parquet.output.committer.class", "org.apache.parquet.hadoop.ParquetOutputCommitter"), spark_conf_context("spark.sql.sources.commitProtocolClass", "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol"):
            df.select(output_cols).write.insertInto(table_name)
        
        # for debbuging - this action is failing
        logger.info(f'check if df in insert_into_hive_table can do action after saving')
        df = df.persist()
        logger.info(f'df count is {df.count()} rows')

我原来的函数只是函数的中间部分。 第一次计数和保存操作运行得很快,我在 Hive 表中看到了正确的结果。 但最后一次计数总是在广播超时时失败,我什至不知道广播发生在哪里。

一些可能影响的细节:

  1. 我收到此错误:
 No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

同时保存到蜂巢。
2.我的默认分区号是200。
3.我正在使用CDP和spark 2.4.8
4.我已经将广播超时增加到720秒
5. 该表包含 2 个分区

编辑: 我发现在 insertInto 之后,我在 Spark UI 存储上看不到 df(但在 insertInto 之前我确实看到了),有什么建议吗?

apache-spark pyspark hive bigdata rdd
1个回答
0
投票

如果您只想将 Spark df 保存在 hive 表中,则只需使用以下 df.write.mode("overwrite").format("delta").saveAsTable("Giveyourtablename")

© www.soinside.com 2019 - 2024. All rights reserved.