星火MongoDB的连接器斯卡拉 - 缺少数据库名称

问题描述 投票:6回答:2

我坚持一个奇怪的问题。我想在本地使用MongoDB的火花连接星火连接到MongoDB的。

除了设立火花我用下面的代码:

val readConfig = ReadConfig(Map("uri" -> "mongodb://localhost:27017/movie_db.movie_ratings", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val writeConfig = WriteConfig(Map("uri" -> "mongodb://127.0.0.1/movie_db.movie_ratings"))

// Load the movie rating data from Mongo DB
val movieRatings = MongoSpark.load(sc, readConfig).toDF()

movieRatings.show(100)

不过,我得到一个编译错误:

java.lang.IllegalArgumentException: Missing database name. Set via the 'spark.mongodb.input.uri' or 'spark.mongodb.input.database' property.

上线,我成立readConfig。我不知道为什么它的抱怨不设置URI时,我显然在地图uri属性。我可能失去了一些东西。

mongodb scala apache-spark apache-spark-sql
2个回答
7
投票

这里提到您可以从SparkSession

val spark = SparkSession.builder()
    .master("local")
    .appName("MongoSparkConnectorIntro")
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017/movie_db.movie_ratings")
    .config("spark.mongodb.input.readPreference.name", "secondaryPreferred")
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/movie_db.movie_ratings")
    .getOrCreate()

使用config创建数据框

val readConfig = ReadConfig(Map("uri" -> "mongodb://localhost:27017/movie_db.movie_ratings", "readPreference.name" -> "secondaryPreferred"))
val df = MongoSpark.load(spark)

写DF到MongoDB的

MongoSpark.save(
df.write
    .option("spark.mongodb.output.uri", "mongodb://127.0.0.1/movie_db.movie_ratings")
    .mode("overwrite"))

在您的代码:前缀缺少配置

val readConfig = ReadConfig(Map(
    "spark.mongodb.input.uri" -> "mongodb://localhost:27017/movie_db.movie_ratings", 
    "spark.mongodb.input.readPreference.name" -> "secondaryPreferred"), 
    Some(ReadConfig(sc)))

val writeConfig = WriteConfig(Map(
    "spark.mongodb.output.uri" -> "mongodb://127.0.0.1/movie_db.movie_ratings"))

0
投票

对于Java,要么你可以在创建火花会话设置CONFIGS或首先创建会话,然后将其设置为运行CONFIGS。

1.

SparkSession sparkSession = SparkSession.builder()
    .master("local")
    .appName("MongoSparkConnector")
    .config("spark.mongodb.input.uri","mongodb://localhost:27017/movie_db.movie_ratings")
    .config("spark.mongodb.input.readPreference.name", "secondaryPreferred")
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/movie_db.movie_ratings")
    .getOrCreate()

要么

2.

 SparkSession sparkSession = SparkSession.builder()
        .master("local")
        .appName("MongoSparkConnector")
        .getOrCreate()

然后,

     String mongoUrl = "mongodb://localhost:27017/movie_db.movie_ratings";
   sparkSession.sparkContext().conf().set("spark.mongodb.input.uri", mongoURL);
   sparkSession.sparkContext().conf().set("spark.mongodb.output.uri", mongoURL);
   Map<String, String> readOverrides = new HashMap<String, String>();
   readOverrides.put("collection", sourceCollection);
   readOverrides.put("readPreference.name", "secondaryPreferred");
   ReadConfig readConfig = ReadConfig.create(sparkSession).withOptions(readOverrides);
   Dataset<Row> df = MongoSpark.loadAndInferSchema(sparkSession,readConfig);
© www.soinside.com 2019 - 2024. All rights reserved.