无法使用akka获取CassandraTableScanRDD

问题描述 投票:2回答:2

我正在尝试使用Akka进行API Rest。其中一个入口点将从我的Cassandra数据库返回数据。所以我可以这样得到我的CassandraTableScanRDD:

val spark = SparkSession
    .builder()
    .appName("Spark Cassandra")
    .config("spark.cores.max", "5")
    .config("spark.sql.warehouse.dir", "/tmp")
    .config("spark.cassandra.connection.host", "localhost")
    .config("spark.cassandra.connection.port", "9042")
    .master("local[*]")
    .getOrCreate()

  val connector = CassandraConnector.apply(spark.sparkContext.getConf)
  val sc = spark.sparkContext
  val temp = sc.cassandraTable("scala_firemen", "firemen")

  temp.foreach(println)

使用此代码,我可以获得所需的所有数据。但是一旦我添加了我的Akka代码,我就无法打印/访问数据了。即使我回滚并删除Akka代码,我仍然编码相同的错误,即:

[error](run-main-0)java.lang.ExceptionInInitializerError

[error] java.lang.ExceptionInInitializerError

[错误]在org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

[sy] to org.apache.spark.rdd.RDD.first(RDD.scala:1367)

[error] at main $ .delayedEndpoint $ main $ 1(Server.scala:34)

[error] at main $ delayedInit $ body.apply(Server.scala:15)

...

错误ContextCleaner:清除线程时出错

...

错误实用程序:线程SparkListenerBus中的未捕获错误,停止SparkContext

...

错误实用程序:在线程SparkListenerBus中抛出未捕获的致命错误

...

当我重新创建一个项目时,代码再次起作用,所以我怀疑在删除Akka代码后我需要清理一些东西再次工作。

scala apache-spark apache-spark-sql akka spark-cassandra-connector
2个回答
2
投票

RDD旨在与Spark一起使用,在StreamingContext中使用它是没有意义的......

对于您的Akka应用程序,您需要Java driver directly,即使在这种情况下,您也不需要执行数据的完整扫描 - 只有当您拥有最少的分区密钥时才能提供快速访问数据...如果您仍需要执行完全扫描,然后最好使用类似this的东西 - Spark使用类似的方法进行数据扫描,但您不需要链接其所有库。

我建议您了解Cassandra的工作原理 - 例如,通过courses on DataStax Academy - 他们将解释如何访问数据等。


1
投票

好吧我明白我不能拥有"com.typesafe.play" %% "play-json"依赖所有的火花依赖,如:

"org.apache.spark" %% "spark-sql" % "2.1.1",
"org.apache.spark" %% "spark-streaming" % "2.1.1",
"com.datastax.spark" %% "spark-cassandra-connector" % "2.0.10"
© www.soinside.com 2019 - 2024. All rights reserved.