反应器背压 - 阻塞直到压力释放

问题描述 投票:0回答:1

我正在尝试使用 Flux 来处理数据库中的记录并将它们传递给 Kinesis 流——这非常有效,但如果我的 Kinesis 客户端得到“备份”,可能会导致无限的内存使用

我已经将背压更改为

FluxSink.OverflowStrategy.ERROR
,当没有足够的压力需求时会出错,但这似乎让我无法选择暂停数据库读取,并在延迟后尝试恢复。

我得到的是类似以下内容:

Flux.<Map<String, List<Object>>>create((emitter) -> {
   /// Boring Database Stuff
   while (resultSet.next()) {
     emitter.next(valueFromDatabase)
   }
}, FluxSink.OverflowStrategy.ERROR)
.subscribeOn(scheduler)
.handle((message, synchronousSink) -> {
  // Send Stuff To Kinesis
}
.subscribe();

我曾假设我能够尝试/捕获

emitter.next()
来处理背压错误,但它并没有被捕获。相反,它只是终止订阅。

我已经尝试过

.onErrorContinue((throwable, o) -> logger.error("Continuing From Error", throwable))
但似乎甚至没有被调用。

我希望能够“暂停”从数据库中读取数据,直到压力消退,然后继续处理下一条记录,如果我不断收到这些错误,则进行某种退避。有什么办法可以做到这一点?

java project-reactor backpressure
1个回答
0
投票

我最终为此创建了自己的水槽:

Sinks.Many<Map<String, List<Object>>> sink = Sinks.many()
    .unicast()
    .onBackpressureBuffer(
          Queues.<Map<String,List<Object>>>get(100).get()
    );

使用水槽,我可以手动处理

FAIL_OVERFLOW
错误。在这种情况下,如果第二个参数返回
emitNext
TRUE

将重试
sink.emitNext(retVal, (signalType, emitResult) -> {
  logger.debug("Got Error At Count [{}] With SignalType: [{}] and EmitResult: [{}]", currentCount, signalType, emitResult);
  return Sinks.EmitResult.FAIL_OVERFLOW == emitResult;
});

或者,您可以使用

sink.tryEmitNext()
方法,它将返回结果(可能是
FAIL_OVERFLOW
),然后您可以根据需要进行处理。

然后,为了将它连接到 spring-cloud-stream,我可以简单地将水槽作为通量返回:

return () -> sink.asFlux()

我确定这有缺点,但在我相对简单的应用程序中,这就足够了。

© www.soinside.com 2019 - 2024. All rights reserved.