我坚持一个奇怪的问题。我想在本地使用MongoDB的火花连接星火连接到MongoDB的。
除了设立火花我用下面的代码:
val readConfig = ReadConfig(Map("uri" -> "mongodb://localhost:27017/movie_db.movie_ratings", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val writeConfig = WriteConfig(Map("uri" -> "mongodb://127.0.0.1/movie_db.movie_ratings"))
// Load the movie rating data from Mongo DB
val movieRatings = MongoSpark.load(sc, readConfig).toDF()
movieRatings.show(100)
不过,我得到一个编译错误:
java.lang.IllegalArgumentException: Missing database name. Set via the 'spark.mongodb.input.uri' or 'spark.mongodb.input.database' property.
上线,我成立readConfig。我不知道为什么它的抱怨不设置URI时,我显然在地图uri属性。我可能失去了一些东西。
这里提到您可以从SparkSession
做
val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://localhost:27017/movie_db.movie_ratings")
.config("spark.mongodb.input.readPreference.name", "secondaryPreferred")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/movie_db.movie_ratings")
.getOrCreate()
使用config创建数据框
val readConfig = ReadConfig(Map("uri" -> "mongodb://localhost:27017/movie_db.movie_ratings", "readPreference.name" -> "secondaryPreferred"))
val df = MongoSpark.load(spark)
写DF到MongoDB的
MongoSpark.save(
df.write
.option("spark.mongodb.output.uri", "mongodb://127.0.0.1/movie_db.movie_ratings")
.mode("overwrite"))
在您的代码:前缀缺少配置
val readConfig = ReadConfig(Map(
"spark.mongodb.input.uri" -> "mongodb://localhost:27017/movie_db.movie_ratings",
"spark.mongodb.input.readPreference.name" -> "secondaryPreferred"),
Some(ReadConfig(sc)))
val writeConfig = WriteConfig(Map(
"spark.mongodb.output.uri" -> "mongodb://127.0.0.1/movie_db.movie_ratings"))
对于Java,要么你可以在创建火花会话设置CONFIGS或首先创建会话,然后将其设置为运行CONFIGS。
1.
SparkSession sparkSession = SparkSession.builder()
.master("local")
.appName("MongoSparkConnector")
.config("spark.mongodb.input.uri","mongodb://localhost:27017/movie_db.movie_ratings")
.config("spark.mongodb.input.readPreference.name", "secondaryPreferred")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/movie_db.movie_ratings")
.getOrCreate()
要么
2.
SparkSession sparkSession = SparkSession.builder()
.master("local")
.appName("MongoSparkConnector")
.getOrCreate()
然后,
String mongoUrl = "mongodb://localhost:27017/movie_db.movie_ratings";
sparkSession.sparkContext().conf().set("spark.mongodb.input.uri", mongoURL);
sparkSession.sparkContext().conf().set("spark.mongodb.output.uri", mongoURL);
Map<String, String> readOverrides = new HashMap<String, String>();
readOverrides.put("collection", sourceCollection);
readOverrides.put("readPreference.name", "secondaryPreferred");
ReadConfig readConfig = ReadConfig.create(sparkSession).withOptions(readOverrides);
Dataset<Row> df = MongoSpark.loadAndInferSchema(sparkSession,readConfig);