使用spark streaming流式传输cassandra中的最新数据

问题描述 投票:2回答:1

我不断从外部源向cassandra写入数据。

现在,我使用spark streaming从cassandra连续读取这些数据,代码如下:

val ssc = new StreamingContext(sc, Seconds(5))

val cassandraRDD = ssc.cassandraTable("keyspace2", "feeds")


val dstream = new ConstantInputDStream(ssc, cassandraRDD)


dstream.foreachRDD { rdd =>
 println("\n"+rdd.count())
}

ssc.start()
ssc.awaitTermination()
sc.stop()

但是,以下行:

val cassandraRDD = ssc.cassandraTable("keyspace2", "feeds")

每次从cassandra获取整个表数据。现在只需将最新数据保存到表格中。

我想要做的是让火花流只读取最新数据,即上次读取后添加的数据。

我怎样才能做到这一点?我试图谷歌这个,但很少有关于此的文档。

我正在使用spark 1.4.1scala 2.10.4cassandra 2.1.12

谢谢!

编辑:

建议的重复问题(由我询问)并不重复,因为它谈论连接火花流和cassandra,这个问题是关于仅流传输最新数据。顺便说一下,使用我提供的代码可以从cassandra流式传输。但是,它每次都需要整个表,而不仅仅是最新的数据。

scala apache-spark cassandra spark-streaming bigdata
1个回答
1
投票

Cassandra将会有一些低级别的工作,可以通知外部系统(索引器,Spark流等)传入Cassandra的新突变,请阅读:https://issues.apache.org/jira/browse/CASSANDRA-8844

© www.soinside.com 2019 - 2024. All rights reserved.