我正在尝试将一致性级别为“ EACH_QUORUM”的spark数据帧写入cassandra。我的代码如下所示:
val sparkBuilder = SparkSession.builder().
config(cassandraHostPropertyProperty, cassandraHosts).
config(cassandraAuthUsernameProperty, CASSANDRA_AUTH_USER_KEY).
config(cassandraAuthPassProperty, CASSANDRA_AUTH_PASS_KEY).
config(cassandraIsSSLEnabledProperty, isSSLEnabled)...
getOrCreate();
下面是编写DF的代码:
df.write.cassandraFormat(tableName, keySpaceName)
.mode(SaveMode.Append)
.options(Map(
WriteConf.ParallelismLevelParam.name -> parallelism_Level.toString,
WriteConf.BatchSizeRowsParam.name -> rowsInBatch.toString
))
.save()
我想添加一个重试策略,以便如果其中一个数据中心处于关闭状态,请将一致性降级为LOCAL_QUORUM。
我知道datastax有一个我应该扩展的类MultipleRetryPolicy.scala,重写方法以添加自定义逻辑并在cassandra conf中使用它的实例。
我如何将此策略应用于我的Sparksession或保存操作?使用或不使用RetryPolicy满足我的要求,scala中还有其他方法吗?
您不希望MultipleRetryPolicy
,您要紧随DowngradingConsistencyRetryPolicy,它不是Spark驱动程序的一部分,因此,除非您将策略移植到Scala,否则将其作为驱动程序设置的一部分是可以的。
[您可以做的是将查询执行包裹在尝试中并捕获UnavailableException
,然后通过更改output.consistency.level parameter只是以较低的一致性重试。