spark-kafka 3.3.2 失败的结构化流媒体

问题描述 投票:0回答:1

我有 Spark 结构化流应用程序,可以与 Spark 3.0.1 愉快地合作。现在我尝试升级到 Spark 3.3.2 并收到以下异常:

> 23/12/15 00:14:45 ERROR MicroBatchExecution: Query [id = 32bd0f16-dd6b-463c-a742-191782d8e1e0, runId = 9d865ad3-7a4e-41e3-af77-cc7db6896cfb] terminated with error
> org.apache.spark.SparkException: The Spark SQL phase planning failed with an internal error. Please, fill a bug report in, and provide the full stack trace.
>   at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:500)
>   at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:512)
>   at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
>   at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
>   at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:145)
>   at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:138)
>   at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:158)
>   at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
>   at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
>   at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
>   at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
>   at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
>   at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:158)
>   at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
>   at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:656)
>   at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>   at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>   at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
>   at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:646)
>   at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
>   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>   at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>   at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
>   at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
>   at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
>   at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
>   at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
>   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
>   at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
>   at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
> Caused by: java.lang.NullPointerException
>   at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.getSortedExecutorList(KafkaOffsetReaderConsumer.scala:484)
>   at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.getOffsetRangesFromResolvedOffsets(KafkaOffsetReaderConsumer.scala:539)
>   at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.planInputPartitions(KafkaMicroBatchStream.scala:197)
>   at org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.inputPartitions$lzycompute(MicroBatchScanExec.scala:45)
>   at org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.inputPartitions(MicroBatchScanExec.scala:45)
>   at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar(DataSourceV2ScanExecBase.scala:142)
>   at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar$(DataSourceV2ScanExecBase.scala:141)
>   at org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.supportsColumnar(MicroBatchScanExec.scala:29)
>   at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.apply(DataSourceV2Strategy.scala:153)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
>   at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
>   at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>   at scala.collection.Iterator.foreach(Iterator.scala:943)
>   at scala.collection.Iterator.foreach$(Iterator.scala:943)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>   at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>   at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
>   at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
>   at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>   at scala.collection.Iterator.foreach(Iterator.scala:943)
>   at scala.collection.Iterator.foreach$(Iterator.scala:943)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>   at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>   at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
>   at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
>   at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>   at scala.collection.Iterator.foreach(Iterator.scala:943)
>   at scala.collection.Iterator.foreach$(Iterator.scala:943)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>   at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>   at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
>   at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
>   at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>   at scala.collection.Iterator.foreach(Iterator.scala:943)
>   at scala.collection.Iterator.foreach$(Iterator.scala:943)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>   at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>   at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
>   at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
>   at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>   at scala.collection.Iterator.foreach(Iterator.scala:943)
>   at scala.collection.Iterator.foreach$(Iterator.scala:943)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>   at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>   at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
>   at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
>   at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>   at scala.collection.Iterator.foreach(Iterator.scala:943)
>   at scala.collection.Iterator.foreach$(Iterator.scala:943)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>   at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>   at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
>   at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
>   at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>   at scala.collection.Iterator.foreach(Iterator.scala:943)
>   at scala.collection.Iterator.foreach$(Iterator.scala:943)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>   at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>   at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
>   at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
>   at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>   at scala.collection.Iterator.foreach(Iterator.scala:943)
>   at scala.collection.Iterator.foreach$(Iterator.scala:943)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>   at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>   at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
>   at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
>   at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>   at scala.collection.Iterator.foreach(Iterator.scala:943)
>   at scala.collection.Iterator.foreach$(Iterator.scala:943)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>   at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>   at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
>   at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
>   at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>   at scala.collection.Iterator.foreach(Iterator.scala:943)
>   at scala.collection.Iterator.foreach$(Iterator.scala:943)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>   at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>   at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
>   at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
>   at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>   at scala.collection.Iterator.foreach(Iterator.scala:943)
>   at scala.collection.Iterator.foreach$(Iterator.scala:943)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>   at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>   at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
>   at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
>   at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>   at scala.collection.Iterator.foreach(Iterator.scala:943)
>   at scala.collection.Iterator.foreach$(Iterator.scala:943)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>   at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>   at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
>   at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
>   at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>   at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>   at scala.collection.Iterator.foreach(Iterator.scala:943)
>   at scala.collection.Iterator.foreach$(Iterator.scala:943)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>   at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>   at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
>   at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
>   at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
>   at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
>   at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:459)
>   at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:145)
>   at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
>   at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
>   at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
> 32 more

这个应用程序没有什么独特之处,它只是读取输入 kafka 主题进行一些处理并写入输出 kafka 主题。当 MicroBatchExecution 启动新的流式查询时,初始批次成功写入输出主题后,会发生异常。我无法调试在 AWS 中运行的应用程序。您能否建议如何解决这个问题

我尝试排除整个处理部分 - 只需从 kafka 读取数据,然后将任何内容写入输出(过滤掉所有记录),但问题仍然存在

apache-spark spark-structured-streaming spark-kafka-integration
1个回答
0
投票

错误消息指示 KafkaOffsetReaderConsumer 类中存在 NullPointerException。这表明可能存在与从 Kafka 读取偏移量相关的问题。

您可以采取一些步骤来诊断并可能解决问题:

检查Kafka连接:

确保您的Spark应用程序可以正确连接到Kafka。验证 Kafka 引导服务器和其他配置。

更新Kafka客户端库:

确保您使用的是 Spark 3.3.2 兼容版本的 Kafka 客户端库。有时,升级 Spark 可能需要对 Kafka 客户端进行相应的更新。

回顾 Kafka 输入配置:

如果您使用 Kafka 作为源,请检查您的 Kafka 源配置,尤其是有关偏移量管理的配置。确保您的 Kafka 主题和分区可访问并具有所需的权限。

简化申请:

正如您所提到的,尝试进一步简化您的 Spark 结构化流应用程序。将其精简为最少的必要组件,以确定问题是否特定于处理逻辑,或者是否与 Kafka 源或接收器相关。

检查 Spark 兼容性:

确保您的 Spark 应用程序与 Spark 3.3.2 兼容。有时,某些 Spark 版本可能与特定库或组件存在兼容性问题。

记录和监控:

在 Spark 应用程序中启用详细日志记录,以捕获有关流处理过程中发生的情况的更多信息。检查 Spark 的事件日志和指标以识别任何模式或异常情况。

考虑降级 Spark:

如果问题仍然存在并且您无法找到快速解决方案,请考虑降级到已知可与您的应用程序配合使用的 Spark 先前版本。这可以在您调查兼容性问题时提供临时解决方法。

社区和文档:

检查 Spark 和 Kafka 社区论坛或邮件列表,了解与 Spark 3.3.2 和 Kafka 集成相关的任何报告问题或解决方案。此外,请查看 Spark 和 Kafka 的文档,以确保您遵循最佳实践。

如果问题仍然存在并且您无法找到解决方案,您可能需要联系 Spark 或 Kafka 社区寻求进一步帮助,并提供有关您的应用程序和配置的详细信息。

© www.soinside.com 2019 - 2024. All rights reserved.