我正在使用spark将一些数据从一个cassandra表移动到另一个集群上的另一个cassandra表。
我为以下源集群之一指定了cassandra配置:
/*
spark.cassandra.connection.host:
spark.cassandra.connection.port:
spark.cassandra.auth.username:
spark.cassandra.auth.password:
spark.cassandra.connection.ssl.clientAuth.enabled: true
spark.cassandra.connection.ssl.enabled: true
spark.cassandra.connection.ssl.trustStore.path:
spark.cassandra.connection.ssl.trustStore.password:
spark.cassandra.connection.timeout_ms: */
SparkSession spark = SparkSession.builder()
.config(conf)
.getOrCreate();
Dataset<Row> df = spark.read()
.format("org.apache.spark.sql.cassandra")
.options(config.getSourceTable())
.load();
df.show();
// *** How/Where do I specify cassandra config in destination cluster? ***
df.write()
.mode(SaveMode.Append)
.format("org.apache.spark.sql.cassandra")
.options(destinationTbl);
如何/在哪里在目标集群中指定cassandra配置(Java推荐)?
谢谢!
我还没有测试过,但是基于Russel Spitzer's blog post,您可以执行以下操作(未在Java中进行测试,但应该可以工作:]
spark
实例时添加它们:]spark.setConf("ClusterSource/spark.cassandra.connection.host", "127.0.0.1");
spark.setConf("ClusterDestination/spark.cassandra.connection.host", "127.0.0.2");
options
中,将相应群集的名称作为cluster
条目。P.S。另外,请记住,如果您需要迁移数据并在数据上保留WriteTime和/或TTL,那么您将需要使用RDD API,因为DataFrame API不支持这些功能。