我正在尝试使用Akka进行API Rest。其中一个入口点将从我的Cassandra数据库返回数据。所以我可以这样得到我的CassandraTableScanRDD:
val spark = SparkSession
.builder()
.appName("Spark Cassandra")
.config("spark.cores.max", "5")
.config("spark.sql.warehouse.dir", "/tmp")
.config("spark.cassandra.connection.host", "localhost")
.config("spark.cassandra.connection.port", "9042")
.master("local[*]")
.getOrCreate()
val connector = CassandraConnector.apply(spark.sparkContext.getConf)
val sc = spark.sparkContext
val temp = sc.cassandraTable("scala_firemen", "firemen")
temp.foreach(println)
使用此代码,我可以获得所需的所有数据。但是一旦我添加了我的Akka代码,我就无法打印/访问数据了。即使我回滚并删除Akka代码,我仍然编码相同的错误,即:
[error](run-main-0)java.lang.ExceptionInInitializerError
[error] java.lang.ExceptionInInitializerError
[错误]在org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
[sy] to org.apache.spark.rdd.RDD.first(RDD.scala:1367)
[error] at main $ .delayedEndpoint $ main $ 1(Server.scala:34)
[error] at main $ delayedInit $ body.apply(Server.scala:15)
...
错误ContextCleaner:清除线程时出错
...
错误实用程序:线程SparkListenerBus中的未捕获错误,停止SparkContext
...
错误实用程序:在线程SparkListenerBus中抛出未捕获的致命错误
...
当我重新创建一个项目时,代码再次起作用,所以我怀疑在删除Akka代码后我需要清理一些东西再次工作。
RDD旨在与Spark一起使用,在StreamingContext
中使用它是没有意义的......
对于您的Akka应用程序,您需要Java driver directly,即使在这种情况下,您也不需要执行数据的完整扫描 - 只有当您拥有最少的分区密钥时才能提供快速访问数据...如果您仍需要执行完全扫描,然后最好使用类似this的东西 - Spark使用类似的方法进行数据扫描,但您不需要链接其所有库。
我建议您了解Cassandra的工作原理 - 例如,通过courses on DataStax Academy - 他们将解释如何访问数据等。
好吧我明白我不能拥有"com.typesafe.play" %% "play-json"
依赖所有的火花依赖,如:
"org.apache.spark" %% "spark-sql" % "2.1.1",
"org.apache.spark" %% "spark-streaming" % "2.1.1",
"com.datastax.spark" %% "spark-cassandra-connector" % "2.0.10"