我正在尝试执行从一个 cassandra 集群 (v3) 到另一个 cassandra 集群 (v4) 的迁移。
源集群和目标集群中的表和架构相同,如下所示:
CREATE KEYSPACE mykeyspace WITH replication = {'class': 'NetworkTopologyStrategy', 'DC': '3'} AND durable_writes = true;
CREATE TABLE mykeyspace.mytable (monthyear text,
key1 text,
generatedid int,
eventdatetime timestamp,
value double,
PRIMARY KEY ((monthyear, key1), generatedid, eventdatetime)
) WITH CLUSTERING ORDER BY (generatedid ASC, eventdatetime DESC);
这是我使用 Spark cassandra 连接器的非常简单的代码(用 Java 编写)。
spark.sql("SELECT * FROM cassandra_old.mykeyspace.mytable WHERE monthyear = '10-2023'").explain();
源 C* 的目录名称为 cassandra_old。当我提交此应用程序时,生成的计划不包括作为 Cassandra 过滤器一部分的月年过滤器,而是由 Spark 完成该过滤器。
这是生成的物理计划:
== Physical Plan ==
*(1) Project [monthyear#0, key1#1, generatedid#2, eventdatetime#3, value#4]
+- *(1) Filter (monthyear#0 = 10-2023)
+- BatchScan mytable[monthyear#0, key1#1, generatedid#2, eventdatetime#3, value#4] Cassandra Scan: mykeyspace.mytable
- Cassandra Filters: []
- Requested Columns: [monthyear,key1,generatedid,eventdatetime,value] RuntimeFilters: []
我无法弄清楚代码有什么问题。由于过滤是在 Spark 中完成的,因此读取和处理所有数据需要很长时间。
以下是我正在使用的版本:
Spark Cassandra 连接器版本:spark-cassandra-connector_2.12:3.4.1
Spark 版本:3.4.1-amzn-0(emr 集群)