从Scala的mongoDB中读取

问题描述 投票:1回答:1

我想创建一个独立的scala代码,该代码使用自定义设置从MongoDB网站中使用this code从MongoDB读取。

当我运行SBT软件包时,遇到一些错误。我想这与SparkSession的错误创建方法有关。您能给我一个修复建议吗?

我的Buid.sbt内容

scalaVersion := "2.11.12"

libraryDependencies ++= Seq(
  "org.mongodb.spark" %% "mongo-spark-connector" % "2.4.1",
  "org.apache.spark" %% "spark-core" % "2.4.1",
  "org.apache.spark" %% "spark-sql" % "2.4.1"
)

[Firstapp.scala代码

package com.mongodb
import org.apache.spark.sql.SparkSession
import com.mongodb.spark.config.{ReadConfig,WriteConfig}
import com.mongodb.spark.MongoSpark
import org.bson.Document

object FirstApp {
  def main(args: Array[String]) {

    val sc = SparkSession.builder()
    .master("local")
    .appName("MongoSparkConnectorIntro")
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
    .getOrCreate()

    val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
    val customRdd = MongoSpark.load(sc, readConfig)

    println(customRdd.count)
    println(customRdd.first.toJson)

 }
}

和运行sbt package后的错误

    value toJson is not a member of org.apache.spark.sql.Row
[error]     println(customRdd.first.toJson)
[error]                             ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 10 s, completed Jun 10, 2020 6:10:50 PM

编辑:

我尝试了该解决方案,但无法正确编译。 Buid.sbt的内容与上述相同。我将SimpleApp.scala更改为:

import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
import org.apache.spark.sql.SparkSession

object FirstApp {
  def main(args: Array[String]) {

    val spark = SparkSession.builder()
        .master("local")
        .appName("MongoSparkConnectorIntro")
        .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
        .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
        .getOrCreate()
    val sc = spark.sparkContext

    val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
    val customRdd = MongoSpark.load(sc)
    println(customRdd.count())
    println(customRdd.first.toJson)

 }
}

编译结果:

$ spark-submit   --class "FirstApp"   --master local[4]   target/scala-2.11/root-2_2.11-0.1.0-SNAPSHOT.jar 
20/06/12 07:09:53 WARN Utils: Your hostname, Project resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
20/06/12 07:09:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
20/06/12 07:09:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/06/12 07:09:54 INFO SparkContext: Running Spark version 2.4.5
20/06/12 07:09:54 INFO SparkContext: Submitted application: MongoSparkConnectorIntro
20/06/12 07:09:55 INFO SecurityManager: Changing view acls to: sadegh
20/06/12 07:09:55 INFO SecurityManager: Changing modify acls to: sadegh
20/06/12 07:09:55 INFO SecurityManager: Changing view acls groups to: 
20/06/12 07:09:55 INFO SecurityManager: Changing modify acls groups to: 
20/06/12 07:09:55 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(sadegh); groups with view permissions: Set(); users  with modify permissions: Set(sadegh); groups with modify permissions: Set()
20/06/12 07:09:55 INFO Utils: Successfully started service 'sparkDriver' on port 33031.
20/06/12 07:09:55 INFO SparkEnv: Registering MapOutputTracker
20/06/12 07:09:55 INFO SparkEnv: Registering BlockManagerMaster
20/06/12 07:09:55 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/06/12 07:09:55 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/06/12 07:09:55 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-7405e1be-08e8-4f58-b88e-b8f01f8fe87e
20/06/12 07:09:55 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
20/06/12 07:09:55 INFO SparkEnv: Registering OutputCommitCoordinator
20/06/12 07:09:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
20/06/12 07:09:55 INFO Utils: Successfully started service 'SparkUI' on port 4041.
20/06/12 07:09:56 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.0.2.15:4041
20/06/12 07:09:56 INFO SparkContext: Added JAR file:/Folder/target/scala-2.11/root-2_2.11-0.1.0-SNAPSHOT.jar at spark://10.0.2.15:33031/jars/root-2_2.11-0.1.0-SNAPSHOT.jar with timestamp 1591938596069
20/06/12 07:09:56 INFO Executor: Starting executor ID driver on host localhost
20/06/12 07:09:56 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 42815.
20/06/12 07:09:56 INFO NettyBlockTransferService: Server created on 10.0.2.15:42815
20/06/12 07:09:56 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/06/12 07:09:56 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.0.2.15, 42815, None)
20/06/12 07:09:56 INFO BlockManagerMasterEndpoint: Registering block manager 10.0.2.15:42815 with 366.3 MB RAM, BlockManagerId(driver, 10.0.2.15, 42815, None)
20/06/12 07:09:56 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.0.2.15, 42815, None)
20/06/12 07:09:56 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.0.2.15, 42815, None)
Exception in thread "main" java.lang.NoClassDefFoundError: com/mongodb/spark/config/ReadConfig$
    at FirstApp$.main(SimpleApp.scala:16)
    at FirstApp.main(SimpleApp.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: com.mongodb.spark.config.ReadConfig$
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 14 more
20/06/12 07:09:56 INFO SparkContext: Invoking stop() from shutdown hook
20/06/12 07:09:56 INFO SparkUI: Stopped Spark web UI at http://10.0.2.15:4041
20/06/12 07:09:56 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/06/12 07:09:56 INFO MemoryStore: MemoryStore cleared
20/06/12 07:09:56 INFO BlockManager: BlockManager stopped
20/06/12 07:09:56 INFO BlockManagerMaster: BlockManagerMaster stopped
20/06/12 07:09:56 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/06/12 07:09:56 INFO SparkContext: Successfully stopped SparkContext
20/06/12 07:09:56 INFO ShutdownHookManager: Shutdown hook called
20/06/12 07:09:56 INFO ShutdownHookManager: Deleting directory /tmp/spark-7f90ac08-403c-4a3f-bb45-ea24a347c380
20/06/12 07:09:56 INFO ShutdownHookManager: Deleting directory /tmp/spark-78cb32aa-c6d1-4ba4-b94f-16d3761d181b
mongodb scala sbt
1个回答
0
投票

我认为您的问题是您正在尝试将SparkSession用作SparkContext,但它们不是同一回事。如果将sc替换为SparkContext,则所有内容都会编译。

import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
    .master("local")
    .appName("MongoSparkConnectorIntro")
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
    .getOrCreate()
val sc = spark.sparkContext

val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val customRdd = MongoSpark.load(sc)
println(customRdd.count())
println(customRdd.first.toJson)
© www.soinside.com 2019 - 2024. All rights reserved.