我有一个 AWS Kinesis 数据流分片,每秒最多允许 1000 条记录。我的申请(复数)远低于平均水平。但是,可能会发生某些突发情况,每秒发送的记录数高达 4000 条(仅一秒,也许两秒)。 正如所写,这些请求来自多个应用程序。这就是为什么我认为最好的方法是将客户端配置为在出现错误时自动重试。
ClientConfiguration clientConfiguration = PredefinedClientConfigurations.defaultConfig();
clientConfiguration.setRetryMode(RetryMode.STANDARD);
这应该重试最多三次,中间有暂停。我不知道这些停顿有多长,但据我了解,每次停顿都比前一个停顿长。
另外我还传递了一个
AsyncHandler
给
putRecordAsync
方法。此处理程序在其 onError
方法上记录错误。我不喜欢的是,当第一次尝试失败并出现 ProvisionedThroughputExceededException
时,也会调用此方法。我只想在最后一次尝试失败时记录请求,以便我知道数据实际上丢失并且未发送。
我怎样才能实现这个目标?可以使用什么其他方法来确保所有请求都到达数据流,假设平均而言(
< 1m) I'm below the 1000 records / second?
var retries = 3;
var retryPolicy = RetryPolicy.builder()
.numRetries(retries)
.retryCondition((context -> {
if (context.retriesAttempted() < 3) {
return true;
}
var record = context.originalRequest().getValueForField("record", Record.class).orElse(null);
log.error("Failed sending record: `{}`", record);
return false;
}))
.build();
var clientOverrideConfiguration = ClientOverrideConfiguration.builder()
.retryPolicy(retryPolicy)
.build();
KinesisClient.builder()
.overrideConfiguration(clientOverrideConfiguration)
.build();