如何从Spring Cloud Stream中的producer.send()发送的异常中恢复

问题描述 投票:1回答:1

我们遇到了以下情况:

  • 我们有一个由3个节点组成的Kafka集群,每个主题创建有3个分区
  • 消息通过MessageChannel.send()发送,产生一个记录,比如分区1
  • 充当该分区的分区负责人的代理失败

默认情况下,MessageChannel.send()返回true并且不会抛出任何异常,即使最终KafkaProducer无法成功发送消息。我们在此调用后约30秒观察日志中的以下消息:Expiring 10 record(s) for helloworld-topic-1 due to 30008 ms has passed since batch creation plus linger time

在我们的例子中,这是不可接受的,因为我们必须确保所有消息最终都传递给Kafka,在调用MessageChannel.send()的那一刻。

我们将spring.cloud.stream.kafka.bindings.<channelName>.producer.sync打开到true,这与文档描述完全一样。它阻止了生产者对成功或交付失败的认可(MessageTimeoutExceptionInterruptedExceptionExecutionException),所有这些都由KafkaProducerMessageHandler控制。这对我们来说似乎是最好的方法,因为在我们的案例中,性能影响可以忽略不计。

但是,如果抛出异常,我们是否需要自己重试? (在我们的客户代码中使用@Retryable

这是一个简单的实验项目:https://github.com/phdezann/spring-cloud-bus-kafka-helloworld

spring-cloud spring-kafka
1个回答
0
投票

如果在send()线程上执行@StreamListener并将异常抛回绑定器,则绑定器重试配置将执行重试。

但是,由于您在HTTP线程上执行发送,因此您需要执行自己的重试(在RetryTemplate()范围内调用send)或制作控制器方法@Retryable

© www.soinside.com 2019 - 2024. All rights reserved.