我使用下面的Python代码从Spark中的“MongoDB”读取数据并将其转换为DataFrame:
从 pyspark.sql 导入 SparkSession
# Initialize a Spark session
spark = SparkSession.builder \
.appName("MongoDB Spark Connector Example") \
.config("spark.mongodb.read.connection.uri", "mongodb://localhost:23017/") \
.config("spark.mongodb.read.database", "db_name") \
.config("spark.mongodb.read.collection", "coll_name") \
.config("spark.sql.debug.maxToStringFields", 1000) \
.getOrCreate()
df = spark.read.format("mongodb").load()
df.createOrReplaceTempView("temp")
sqlDf = spark.sql("SELECT id from temp")
sqlDf.show()
我正在使用:
我使用下面的命令来运行上面的代码
spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.12:10.2.0 test.py
我收到以下错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o67.showString.
: java.lang.NoSuchMethodError: org.apache.spark.sql.types.StructType.toAttributes()Lscala/collection/immutable/Seq;
at com.mongodb.spark.sql.connector.schema.InternalRowToRowFunction.<init>(InternalRowToRowFunction.java:46)
at com.mongodb.spark.sql.connector.schema.RowToBsonDocumentConverter.<init>(RowToBsonDocumentConverter.java:84)
at com.mongodb.spark.sql.connector.read.MongoScanBuilder.<clinit>(MongoScanBuilder.java:72)
at com.mongodb.spark.sql.connector.MongoTable.newScanBuilder(MongoTable.java:121)
这可能与您正在使用的 scala、pyspark 和 mongodb 驱动程序之间的版本冲突有关。
可以查看已安装的spark和scala版本,并搜索对应的mongo依赖。
您还可以尝试卸载所有内容并从一开始就选择好的版本来重新安装。
如果需要,请查看 Spark 官方文档此处以获取安装详细信息。