如何向kafka发布IntegrationFlow添加重试建议

问题描述 投票:0回答:1

如果在我的publisherFlow 中发布到kafka 时出现错误,我正在尝试设置retryAdvice。我似乎无法找到如何让它工作的正确配置。任何帮助或指导将不胜感激。

    @Bean
    public IntegrationFlow publisherFlow(CsvToEmailInteractionConverter converter, KafkaTemplate<String, String> kafkaTemplate) {
        return IntegrationFlow.from("publisherChannel")
                              .transform(converter, "transform")
                              .transform(Transformers.toJson())
                              .log(LoggingHandler.Level.DEBUG, Constants.LOG_FLOW_CATEGORY, m -> "Payload: " + m.getPayload())
                              .handle(Kafka.outboundChannelAdapter(kafkaTemplate)
                                           .topic(Constants.KAFKA_TOPIC), e -> e.id(Constants.ENGAGE_ACOUSTIC_ID))
                              // .routeByException(r -> r.defaultOutputChannel(MessageChannels.direct("standardFlowExceptionChannel").getObject()))
                              .get();
    }

我认为建议需要先于

handle()

java spring-integration spring-dsl
1个回答
0
投票

我们在您的问题中没有看到任何重试配置,但请参阅这可能正是您正在寻找的内容:如何让 RetryAdvice 为 KafkaProducerMessageHandler 工作

所以,这基本上就是您所需要的:

.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
                 .topic(Constants.KAFKA_TOPIC)
                 .sync(true), 
            e -> e.id(Constants.ENGAGE_ACOUSTIC_ID)
                    .advice(retryAdvice))
© www.soinside.com 2019 - 2024. All rights reserved.