我正在尝试使用 Flux 来处理数据库中的记录并将它们传递给 Kinesis 流——这非常有效,但如果我的 Kinesis 客户端得到“备份”,可能会导致无限的内存使用
我已经将背压更改为
FluxSink.OverflowStrategy.ERROR
,当没有足够的压力需求时会出错,但这似乎让我无法选择暂停数据库读取,并在延迟后尝试恢复。
我得到的是类似以下内容:
Flux.<Map<String, List<Object>>>create((emitter) -> {
/// Boring Database Stuff
while (resultSet.next()) {
emitter.next(valueFromDatabase)
}
}, FluxSink.OverflowStrategy.ERROR)
.subscribeOn(scheduler)
.handle((message, synchronousSink) -> {
// Send Stuff To Kinesis
}
.subscribe();
我曾假设我能够尝试/捕获
emitter.next()
来处理背压错误,但它并没有被捕获。相反,它只是终止订阅。
我已经尝试过
.onErrorContinue((throwable, o) -> logger.error("Continuing From Error", throwable))
但似乎甚至没有被调用。
我希望能够“暂停”从数据库中读取数据,直到压力消退,然后继续处理下一条记录,如果我不断收到这些错误,则进行某种退避。有什么办法可以做到这一点?
我最终为此创建了自己的水槽:
Sinks.Many<Map<String, List<Object>>> sink = Sinks.many()
.unicast()
.onBackpressureBuffer(
Queues.<Map<String,List<Object>>>get(100).get()
);
使用水槽,我可以手动处理
FAIL_OVERFLOW
错误。在这种情况下,如果第二个参数返回emitNext
,
TRUE
将重试
sink.emitNext(retVal, (signalType, emitResult) -> {
logger.debug("Got Error At Count [{}] With SignalType: [{}] and EmitResult: [{}]", currentCount, signalType, emitResult);
return Sinks.EmitResult.FAIL_OVERFLOW == emitResult;
});
或者,您可以使用
sink.tryEmitNext()
方法,它将返回结果(可能是 FAIL_OVERFLOW
),然后您可以根据需要进行处理。
然后,为了将它连接到 spring-cloud-stream,我可以简单地将水槽作为通量返回:
return () -> sink.asFlux()
我确定这有缺点,但在我相对简单的应用程序中,这就足够了。