错误 MicroBatchExecution - PySpark:将数据帧写入 Elasticsearch

问题描述 投票:0回答:1

我正在尝试使用 PySpark 将流写入 Elasticsearch。我有两个从 Kafka 读取的数据帧并连接到

df_joined
。将
df_joined
打印到终端会显示正确的列和值。 一旦我尝试使用以下代码将其写入 Elasticsearch(在 localhost:9200 上):

spark_session = SparkSession.builder.appName("spark-test").config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.elasticsearch:elasticsearch-hadoop:8.5.3").getOrCreate()
df1 = ... 
df2 = ... # For df1 and df2 I do .select(from_json(col('value), schema)) 
df_joined = df1.join(df2, df1.fk == d2.pk)
query = df_joined \
    .writeStream \
    .outputMode("append") \
    .format("org.elasticsearch.spark.sql") \
    .option("es.resource", "name_of_index/name_of_type") \
    .option("es.mapping.id", "id") \
    .option("es.spark.sql.streaming.sink.log.enabled", False) \
    .option("checkpointLocation", "/tmp/es_checkpoint") \
    .start()
query.awaitTermination()

我用:

  • elastic-search-hadoop 版本 8.5.3
  • PySpark 版本 3.3.1

我收到以下错误/堆栈跟踪:

22/12/28 00:41:48 ERROR MicroBatchExecution: Query [id = 3b9b239f-bd38-43e0-820a-6d8fdac56e79, runId = e4869668-2e3c-4e8e-a6be-6c111a832812] terminated with error
java.lang.NoSuchMethodError: org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(Lorg/apache/spark/sql/SparkSession;Lorg/apache/spark/sql/execution/QueryExecution;Lscala/Function0;)Ljava/lang/Object;
    at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink.addBatch(EsSparkSqlStreamingSink.scala:62)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:666)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:664)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:664)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
Exception in thread "stream execution thread for [id = 3b9b239f-bd38-43e0-820a-6d8fdac56e79, runId = e4869668-2e3c-4e8e-a6be-6c111a832812]" java.lang.NoSuchMethodError: org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(Lorg/apache/spark/sql/SparkSession;Lorg/apache/spark/sql/execution/QueryExecution;Lscala/Function0;)Ljava/lang/Object;
    at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink.addBatch(EsSparkSqlStreamingSink.scala:62)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:666)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:664)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:664)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)

我尝试过的

  • 我尝试将 PySpark 降级到版本 3.2.0(基于已接受的 SO 答案)
  • 创建 SparkSession 时,我尝试将
    org.elasticsearch:elasticsearch-hadoop:8.5.3
    更改为
    org.elasticsearch:elasticsearch-spark-20_2.11:8.5.3

编辑:

  • Koedlt的回答对于更好地理解问题非常有帮助,但不幸的是,由于与Kafka的集成失败,将版本降级到2.3.0并没有解决问题。
apache-spark elasticsearch pyspark apache-spark-sql
1个回答
2
投票

如果您查看错误中最有趣的部分,您会看到以下内容:

java.lang.NoSuchMethodError:org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(Lorg/apache/spark/sql/SparkSession;Lorg/apache/spark/sql/execution/QueryExecution;Lscala/Function0;)

所以它没有找到一种名为

SQLExecution
withNewExecutionId
方法,该方法具有
SparkSession
QueryExecution
Function
作为输入。我们来看看 Spark 源代码中的该方法。

版本3.3.1中,我们看到以下函数签名:

def withNewExecutionId[T](
      queryExecution: QueryExecution,
      name: Option[String] = None)(body: => T): T

这解释了为什么您会得到

NoSuchMethodError
:没有具有您期望的函数签名的方法!

现在,让我们看看您的工件在 Maven Repository 中的依赖关系。对于 Spark,我们看到的是:

左列是依赖项版本,右列是该依赖项的最新稳定更新。我们来看看依赖版本,2.3.0.

版本2.3.0中,我们看到以下函数签名:

def withNewExecutionId[T](
      sparkSession: SparkSession,
      queryExecution: QueryExecution)(body: => T): T

这看起来就像我们所期待的!一个

SparkSession
、一个
QueryExecution
和一个
Function
作为输入。

解决方案:这个工件似乎需要您使用 Spark 2.3.0,并且与您现在使用的新版本不兼容。我本身不知道这个工件,所以也许有办法解决这个问题。但尝试一下 2.3.0 版本,看看会发生什么:)

© www.soinside.com 2019 - 2024. All rights reserved.