如果在我的publisherFlow 中发布到kafka 时出现错误,我正在尝试设置retryAdvice。我似乎无法找到如何让它工作的正确配置。任何帮助或指导将不胜感激。
@Bean
public IntegrationFlow publisherFlow(CsvToEmailInteractionConverter converter, KafkaTemplate<String, String> kafkaTemplate) {
return IntegrationFlow.from("publisherChannel")
.transform(converter, "transform")
.transform(Transformers.toJson())
.log(LoggingHandler.Level.DEBUG, Constants.LOG_FLOW_CATEGORY, m -> "Payload: " + m.getPayload())
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
.topic(Constants.KAFKA_TOPIC), e -> e.id(Constants.ENGAGE_ACOUSTIC_ID))
// .routeByException(r -> r.defaultOutputChannel(MessageChannels.direct("standardFlowExceptionChannel").getObject()))
.get();
}
我认为建议需要先于
handle()
?
我们在您的问题中没有看到任何重试配置,但请参阅这可能正是您正在寻找的内容:如何让 RetryAdvice 为 KafkaProducerMessageHandler 工作。
所以,这基本上就是您所需要的:
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
.topic(Constants.KAFKA_TOPIC)
.sync(true),
e -> e.id(Constants.ENGAGE_ACOUSTIC_ID)
.advice(retryAdvice))