Spark 使用不同的 TTL 写入 Cassandra

问题描述 投票:0回答:2

在 Java Spark 中,我有一个数据帧,其中有一个“bucket_timestamp”列,它表示该行所属的存储桶的时间。

我想将数据帧写入 Cassandra DB。数据必须以TTL方式写入DB。 TTL 应取决于存储桶时间戳 - 其中每行的 TTL 应计算为

ROW_TTL = CONST_TTL - (CurrentTime - bucket_timestamp)
,其中
CONST_TTL
是我配置的常量 TTL。

目前我正在使用常量 TTL 通过 Spark 向 Cassandra 写入数据,代码如下:

df.write().format("org.apache.spark.sql.cassandra")
            .options(new HashMap<String, String>() {
                {
                    put("keyspace", "key_space_name");
                    put("table, "table_name");
                    put("spark.cassandra.output.ttl, Long.toString(CONST_TTL)); // Should be depended on bucket_timestamp column
                }
            }).mode(SaveMode.Overwrite).save();

我想到的一种可能的方法是 - 对于每个可能的bucket_timestamp - 根据时间戳过滤数据,计算TTL并将过滤后的数据写入Cassandra。但这似乎非常低效,而且不是火花方式。 Java Spark 中有没有办法提供 Spark 列作为 TTL 选项,以便每一行的 TTL 都不同?

解决方案应该使用Java和数据集< Row>:我遇到了一些在scala中使用RDD执行此操作的解决方案,但没有找到使用Java和dataframe的解决方案。

谢谢!

java apache-spark cassandra spark-cassandra-connector scylla
2个回答
2
投票

对于 DataFrame API 不支持此类功能,但是...有 JIRA - https://datastax-oss.atlassian.net/browse/SPARKC-416,您可以观看它以在何时收到通知已实施...

因此,您唯一的选择就是使用 RDD API,如 @bartosz25 的答案中所述......


2
投票

来自 Spark-Cassandra 连接器选项(https://github.com/datastax/spark-cassandra-connector/blob/v2.3.0/spark-cassandra-connector/src/main/java/com/datastax/spark/connector /japi/RDDAndDStreamCommonJavaFunctions.java)您可以将 TTL 设置为:

  • 恒定值(
    withConstantTTL
    )
  • 自动解析值(
    withAutoTTL
    )
  • 基于列的值 (
    withPerRowTTL
    )

在您的情况下,您可以尝试最后一个选项,并使用您在问题中提供的规则将 TTL 计算为起始

Dataset
的新列。

对于用例,您可以在此处查看测试:https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/it/scala/com/datastax/spark/连接器/编写器/TableWriterSpec.scala#L612

© www.soinside.com 2019 - 2024. All rights reserved.