我正在尝试从 PySpark 连接到我的 MongoDB 数据库。
$ pyspark --packages org.mongodb.spark:mongo-spark-connector_2.13:10.1.1
我安装的版本: Python 3.9 斯卡拉:2.12.15 火花:3.3.2
在 pyspark 中,我可以很好地连接到数据库。我可以很好地加载数据帧,并且还可以很好地显示有关从 MongoDB 获取的数据帧的元数据。
>>> df = spark.read.format("mongodb").option("connection.uri", "mongodb://127.0.0.1/test.testData").load()
>>> df.columns
['_id', 'x']
一切都好。但是当我尝试获取并查看数据时,我收到错误。
>>> df.first() or
>>> df.show()
错误:
>>> df.first()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/spark/python/pyspark/sql/dataframe.py", line 1938, in first
return self.head()
File "/opt/spark/python/pyspark/sql/dataframe.py", line 1924, in head
rs = self.head(1)
File "/opt/spark/python/pyspark/sql/dataframe.py", line 1926, in head
return self.take(n)
File "/opt/spark/python/pyspark/sql/dataframe.py", line 868, in take
return self.limit(num).collect()
File "/opt/spark/python/pyspark/sql/dataframe.py", line 817, in collect
sock_info = self._jdf.collectToPython()
File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
File "/opt/spark/python/pyspark/sql/utils.py", line 190, in deco
return f(*a, **kw)
File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o44.collectToPython.
: java.lang.NoSuchMethodError: 'scala.collection.immutable.Seq org.apache.spark.sql.types.StructType.toAttributes'
at com.mongodb.spark.sql.connector.schema.InternalRowToRowFunction.<init>(InternalRowToRowFunction.java:4
at com.mongodb.spark.sql.connector.schema.RowToBsonDocumentConverter.<init>(RowToBsonDocumentConverter.ja:80)
at com.mongodb.spark.sql.connector.read.MongoScanBuilder.<clinit>(MongoScanBuilder.java:75)
at com.mongodb.spark.sql.connector.MongoTable.newScanBuilder(MongoTable.java:125)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$createScanBuilder$1.applrElse(V2ScanRelationPushDown.scala:56)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$createScanBuilder$1.applrElse(V2ScanRelationPushDown.scala:54)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.sca:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.sca:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:589)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1228)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1227)
at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:208
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:589)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.sca:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.sca:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:589)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1228)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1227)
at org.apache.spark.sql.catalyst.plans.logical.GlobalLimit.mapChildren(basicLogicalOperators.scala:1258)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:589)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.sca:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.sca:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:528)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.createScanBuilder(V2ScanRelationshDown.scala:54)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.$anonfun$apply$1(V2ScanRelationPhDown.scala:42)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.$anonfun$apply$7(V2ScanRelationPhDown.scala:50)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.apply(V2ScanRelationPushDown.sca:49)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.apply(V2ScanRelationPushDown.sca:37)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:126)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:122)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:118)
at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:204)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainStng(QueryExecution.scala:249)
at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:218)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3685)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
at java.base/java.lang.reflect.Method.invoke(Method.java:578)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:1589)
我已经安装了以下软件包:
有人可以帮我解决可能引发错误的问题吗?
提前致谢。
我尝试将这些 jar 添加到 pyspark 启动包中
架构运行良好。
>>> df.printSchema()
root
|-- _id: string (nullable = true)
|-- x: integer (nullable = true)
X 列的值为 1..20,数据应该显示。
该错误是由于您用于 pyspark 和 mongo-spark-connector 的 scala 版本错误所致。
首先,卸载所有内容并开始使用以下命令安装 pyspark:
pip install pyspark
请遵循官方文档。
然后运行
pyspark --version
并检查您使用的 Spark 和 scala 版本。
对我来说,它是 Spark:3.3.2 和 Scala:2.12.15。现在使用此信息来选择依赖库。
在这种情况下我将使用:
pyspark --packages org.mongodb.spark:mongo-spark-connector_2.12:10.1.1
对于本教程的其余部分,您可以参考this,或您想要的任何其他教程。
在文档(https://www.mongodb.com/docs/spark-connector/current/)中,他们说:“用于 Spark 的 mongoDB 连接器”(版本 10.2.1)适用于 Spark 3.1 版本至 3.2.4。所以我认为我们需要减少 Spark 的版本。也可以从 mongoDB 论坛查看此讨论 https://www.mongodb.com/community/forums/t/getting-java-lang-nosuchmethoderror-when-trying-to-write-to-a-collection-via-spark-mongo-connector/237021