在Spark 2.3中运行多个cassandra查询并在一个RDD中获取结果

问题描述 投票:-2回答:2

我有一个字符串序列,我想在我的Cassandra查询的where子句中使用。因此,序列中的每个字符串都会有一个查询。

idSeq.foreach(id => {
  val rdd1 = sc.cassandraTable("keyspace", "columnfamily").
  where("id = ?", id).
  limit(100)
})

所以我在我的Sequence上放了一个循环,我正在为Sequence中的每个id运行查询。我想将所有结果合并到一个RDD中,并在组合RDD上执行映射和保存操作。我已经尝试创建一个空的RDD并进行联合但是RDD在循环之后仍保持为空并且没有任何东西被保存。这样做的正确方法是什么?

scala apache-spark spark-cassandra-connector
2个回答
1
投票
sc.union(idSeq.map(id => {
    sc.cassandraTable("keyspace", "columnfamily").where("id = ?", id).limit(100)
}))

0
投票

更快,更有效的解决方案是创建一个包含要获取的ID的RDD,然后使用joinWithCassandraTable查询来自Cassandra的数据。例如(来自doc):

val joinWithRDD = sc.parallelize(0 to 5)
     .map(CustomerID(_))
     .joinWithCassandraTable("test","customer_info")

有关更多详细信息,请参阅documentation,包括有关输入数据重新分配以更有效地获取数据的说明。

© www.soinside.com 2019 - 2024. All rights reserved.