我有一个字符串序列,我想在我的Cassandra查询的where子句中使用。因此,序列中的每个字符串都会有一个查询。
idSeq.foreach(id => {
val rdd1 = sc.cassandraTable("keyspace", "columnfamily").
where("id = ?", id).
limit(100)
})
所以我在我的Sequence上放了一个循环,我正在为Sequence中的每个id运行查询。我想将所有结果合并到一个RDD中,并在组合RDD上执行映射和保存操作。我已经尝试创建一个空的RDD并进行联合但是RDD在循环之后仍保持为空并且没有任何东西被保存。这样做的正确方法是什么?
sc.union(idSeq.map(id => {
sc.cassandraTable("keyspace", "columnfamily").where("id = ?", id).limit(100)
}))
更快,更有效的解决方案是创建一个包含要获取的ID的RDD,然后使用joinWithCassandraTable
查询来自Cassandra的数据。例如(来自doc):
val joinWithRDD = sc.parallelize(0 to 5)
.map(CustomerID(_))
.joinWithCassandraTable("test","customer_info")
有关更多详细信息,请参阅documentation,包括有关输入数据重新分配以更有效地获取数据的说明。