具有RetryPolicy的Datastax spark cassandra连接器,用于将DF写入cassandra表中

问题描述 投票:0回答:1

我正在尝试将一致性级别为“ EACH_QUORUM”的spark数据帧写入cassandra。我的代码如下所示:

val sparkBuilder = SparkSession.builder().
  config(cassandraHostPropertyProperty, cassandraHosts).
  config(cassandraAuthUsernameProperty, CASSANDRA_AUTH_USER_KEY).
  config(cassandraAuthPassProperty, CASSANDRA_AUTH_PASS_KEY).
  config(cassandraIsSSLEnabledProperty, isSSLEnabled)...
  getOrCreate();

下面是编写DF的代码:

df.write.cassandraFormat(tableName, keySpaceName)
    .mode(SaveMode.Append)
    .options(Map(
      WriteConf.ParallelismLevelParam.name -> parallelism_Level.toString,
      WriteConf.BatchSizeRowsParam.name -> rowsInBatch.toString
    ))
    .save()

我想添加一个重试策略,以便如果其中一个数据中心处于关闭状态,请将一致性降级为LOCAL_QUORUM。

我知道datastax有一个我应该扩展的类MultipleRetryPolicy.scala,重写方法以添加自定义逻辑并在cassandra conf中使用它的实例。

我如何将此策略应用于我的Sparksession或保存操作?使用或不使用RetryPolicy满足我的要求,scala中还有其他方法吗?

scala apache-spark cassandra datastax-java-driver spark-cassandra-connector
1个回答
0
投票

您不希望MultipleRetryPolicy,您要紧随DowngradingConsistencyRetryPolicy,它不是Spark驱动程序的一部分,因此,除非您将策略移植到Scala,否则将其作为驱动程序设置的一部分是可以的。

[您可以做的是将查询执行包裹在尝试中并捕获UnavailableException,然后通过更改output.consistency.level parameter只是以较低的一致性重试。

© www.soinside.com 2019 - 2024. All rights reserved.