PySpark 错误:java.lang.NoSuchMethodError:'scala.collection.immutable.Seq org.apache.spark.sql.types.StructType.toAttributes()'

问题描述 投票:0回答:2

我正在尝试从 PySpark 连接到我的 MongoDB 数据库。

$ pyspark --packages org.mongodb.spark:mongo-spark-connector_2.13:10.1.1

我安装的版本: Python 3.9 斯卡拉:2.12.15 火花:3.3.2

在 pyspark 中,我可以很好地连接到数据库。我可以很好地加载数据帧,并且还可以很好地显示有关从 MongoDB 获取的数据帧的元数据。

>>> df = spark.read.format("mongodb").option("connection.uri", "mongodb://127.0.0.1/test.testData").load()  
>>> df.columns
['_id', 'x']

一切都好。但是当我尝试获取并查看数据时,我收到错误。

    >>> df.first() or
    >>> df.show()

错误:

>>> df.first()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 1938, in first
    return self.head()
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 1924, in head
    rs = self.head(1)
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 1926, in head
    return self.take(n)
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 868, in take
    return self.limit(num).collect()
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 817, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
  File "/opt/spark/python/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o44.collectToPython.
: java.lang.NoSuchMethodError: 'scala.collection.immutable.Seq org.apache.spark.sql.types.StructType.toAttributes'
        at com.mongodb.spark.sql.connector.schema.InternalRowToRowFunction.<init>(InternalRowToRowFunction.java:4
        at com.mongodb.spark.sql.connector.schema.RowToBsonDocumentConverter.<init>(RowToBsonDocumentConverter.ja:80)
        at com.mongodb.spark.sql.connector.read.MongoScanBuilder.<clinit>(MongoScanBuilder.java:75)
        at com.mongodb.spark.sql.connector.MongoTable.newScanBuilder(MongoTable.java:125)
        at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$createScanBuilder$1.applrElse(V2ScanRelationPushDown.scala:56)
        at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$createScanBuilder$1.applrElse(V2ScanRelationPushDown.scala:54)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.sca:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.sca:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:589)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1228)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1227)
        at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:208
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:589)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.sca:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.sca:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:589)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1228)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1227)
        at org.apache.spark.sql.catalyst.plans.logical.GlobalLimit.mapChildren(basicLogicalOperators.scala:1258)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:589)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.sca:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.sca:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:528)
        at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.createScanBuilder(V2ScanRelationshDown.scala:54)
        at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.$anonfun$apply$1(V2ScanRelationPhDown.scala:42)
        at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.$anonfun$apply$7(V2ScanRelationPhDown.scala:50)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:91)
        at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.apply(V2ScanRelationPushDown.sca:49)
        at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.apply(V2ScanRelationPushDown.sca:37)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:91)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:126)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:122)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:118)
        at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
        at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:204)
        at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainStng(QueryExecution.scala:249)
        at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:218)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
        at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3685)
        at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
        at java.base/java.lang.reflect.Method.invoke(Method.java:578)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:1589)

我已经安装了以下软件包:

  • spark-catalyst_2.12-3.3.2.jar
  • /scala-library-2.10.0-M3.jar

有人可以帮我解决可能引发错误的问题吗?

提前致谢。

我尝试将这些 jar 添加到 pyspark 启动包中

  • spark-catalyst_2.12-3.3.2.jar
  • scala-library-2.10.0-M3.jar

架构运行良好。

    >>> df.printSchema()
    root
     |-- _id: string (nullable = true)
     |-- x: integer (nullable = true)
    

X 列的值为 1..20,数据应该显示。

mongodb scala apache-spark pyspark apache-spark-sql
2个回答
2
投票

该错误是由于您用于 pyspark 和 mongo-spark-connector 的 scala 版本错误所致。

首先,卸载所有内容并开始使用以下命令安装 pyspark:

pip install pyspark

请遵循官方文档

然后运行

pyspark --version

并检查您使用的 Spark 和 scala 版本。

对我来说,它是 Spark:3.3.2Scala:2.12.15。现在使用此信息来选择依赖库。

在这种情况下我将使用:

pyspark --packages org.mongodb.spark:mongo-spark-connector_2.12:10.1.1

对于本教程的其余部分,您可以参考this,或您想要的任何其他教程。


0
投票

在文档(https://www.mongodb.com/docs/spark-connector/current/)中,他们说:“用于 Spark 的 mongoDB 连接器”(版本 10.2.1)适用于 Spark 3.1 版本至 3.2.4。所以我认为我们需要减少 Spark 的版本。也可以从 mongoDB 论坛查看此讨论 https://www.mongodb.com/community/forums/t/getting-java-lang-nosuchmethoderror-when-trying-to-write-to-a-collection-via-spark-mongo-connector/237021

© www.soinside.com 2019 - 2024. All rights reserved.