Spark Cassandra连接器-分区键范围查询

问题描述 投票:5回答:3

我正在评估spark-cassandra-connector,但我正努力尝试获取有关分区键的范围查询才能正常工作。

根据连接器的文档,似乎可以使用等号或IN运算符对分区键进行服务器端筛选,但是很遗憾,我的分区键是时间戳记,所以我不能使用它。

所以我尝试将Spark SQL与以下查询一起使用('timestamp'是分区键):

select * from datastore.data where timestamp >= '2013-01-01T00:00:00.000Z' and timestamp < '2013-12-31T00:00:00.000Z'

尽管该作业产生了200个任务,但查询未返回任何数据。

而且我还可以确保自从在cqlsh上运行查询(使用'token'函数执行适当的转换)后确实会返回数据。

我在独立模式下使用spark 1.1.0。 Cassandra是2.1.2,连接器版本是'b1.1'分支。 Cassandra驱动程序是DataStax的“主”分支。Cassandra集群覆盖在具有3台服务器的Spark集群上,复制因子为1。

Here is the job's full log

有任何线索吗?

更新:尝试基于分区键(使用CassandraRDD.where方法)进行服务器端筛选时,出现以下异常:

Exception in thread "main" java.lang.UnsupportedOperationException: Range predicates on partition key columns (here: timestamp) are not supported in where. Use filter instead.

但是很遗憾,我不知道什么是“过滤器” ...

cassandra apache-spark
3个回答
8
投票

您有多种选择以获得所需的解决方案。

最强大的方法是使用Stratio与Cassandra集成的Lucene索引,它允许您按服务器端的任何索引字段进行搜索。您的书写时间将增加,但另一方面,您将可以查询任何时间范围。您可以在Cassandra here中找到有关Lucene索引的更多信息。 Cassandra的扩展版本已完全集成到deep-spark project中,因此您可以通过它充分利用Cassandra中Lucene索引的所有优势。我建议您在执行检索中小型结果集的受限查询时使用Lucene索引,如果要检索大部分数据集,则应使用下面的第三个选项。

另一种方法,取决于您的应用程序的工作方式,可能是截断时间戳字段,以便您可以使用IN运算符查找它。问题是,据我所知,您不能为此使用spark-cassandra-connector,应该使用未与Spark集成的直接Cassandra驱动程序,或者可以查看deep-spark项目即将启用的新功能。您的查询看起来像这样:

select * from datastore.data where timestamp IN ('2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04', ... , '2013-12-31')

,但是,正如我之前说的,我不知道它是否适合您的需求,因为您可能无法截断数据并将其按日期/时间分组。

最后一个选择,但效率较低,是将整个数据集带入您的Spark集群,并在RDD上应用过滤器。

免责声明:我为Straatio工作:-)如果您需要任何帮助,请随时与我们联系。

希望对您有帮助!


8
投票

我认为CassandraRDD错误告诉您试图在Cassandra中执行的查询,并且您必须将所有表加载到CassandraRDD中,然后对该CassandraRDD进行火花过滤器操作。

因此,您的代码(在scala中)应如下所示:

val cassRDD= sc.cassandraTable("keyspace name", "table name").filter(row=> row.getDate("timestamp")>=DateFormat('2013-01-01T00:00:00.000Z')&&row.getDate("timestamp") < DateFormat('2013-12-31T00:00:00.000Z'))

如果您有兴趣进行这种类型的查询,则可能必须看看其他Cassandra连接器,例如Stratio开发的连接器


0
投票

@@ jlopezmat ...我有类似的问题。我的Cassandra表就是这样。创建表ks.table(用户名文本,valid_from日期,valid_until日期,主键((用户名,valid_from,valid_until)))。

我的分区键是所有三列。

我正在使用DataStax Connector过滤出数据。我的代码看起来像这样

case class Schema(username: String, valid_from: String, valid_until: String)

val rdd = spark.sparkContext.cassandraTable[Schema]("ks", "table").select("username", "valid_from", "valid_until")
rdd: com.datastax.spark.connector.rdd.CassandraTableScanRDD[Schema] = CassandraTableScanRDD[22] at RDD at CassandraRDD.scala:19

rdd.where("username = 'myname' and valid_from <= '2019-01-01' and valid_until >= '2019-01-01'").foreach(println)
java.lang.UnsupportedOperationException: Range predicates on partition key columns (here: valid_from) are not supported in where. Use filter instead.
  at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$2.applyOrElse(CassandraTableScanRDD.scala:412)
  at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$2.applyOrElse(CassandraTableScanRDD.scala:407)
  at scala.PartialFunction$$anonfun$runWith$1.apply(PartialFunction.scala:136)
  at scala.PartialFunction$$anonfun$runWith$1.apply(PartialFunction.scala:135)
  at scala.collection.immutable.List.foreach(List.scala:383)
  at scala.collection.TraversableLike$class.collect(TraversableLike.scala:282)
  at scala.collection.immutable.List.collect(List.scala:317)
  at com.datastax.spark.connector.rdd.CassandraTableScanRDD.containsPartitionKey(CassandraTableScanRDD.scala:407)
  at com.datastax.spark.connector.rdd.CassandraTableScanRDD.partitionGenerator$lzycompute(CassandraTableScanRDD.scala:223)
  at com.datastax.spark.connector.rdd.CassandraTableScanRDD.partitionGenerator(CassandraTableScanRDD.scala:222)
  at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:271)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
  at scala.Option.getOrElse(Option.scala:120)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
  at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:927)
  at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:925)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.foreach(RDD.scala:925)
  ... 43 elided
© www.soinside.com 2019 - 2024. All rights reserved.