如果在处理步骤期间发生故障,如何使Spring云流Kafka流绑定程序重试处理消息?

问题描述 投票:1回答:1

我正在使用Spring Cloud Stream开发Kafka Streams。在消息处理应用程序中,可能会产生错误。因此,不应提交该消息并再次尝试。

我的申请方法-

@Bean
public Function<KStream<Object, String>, KStream<String, Long>> process() {
return (input) -> {
KStream<Object, String> kt = input.flatMapValues(v -> Arrays.asList(v.toUpperCase().split("\\W+")));
KGroupedStream<String, String> kgt =kt.map((k, v) -> new KeyValue<>(v, v)).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
KTable<Windowed<String>, Long> ktable = kgt.windowedBy(TimeWindows.of(500)).count();
KStream<String, WordCount> kst =ktable.toStream().map((k,v) -> {
WordCount wc = new WordCount();
wc.setWord(k.key());
wc.setCount(v);
wc.setStart(new Date(k.window().start()));
wc.setEnd(new Date(k.window().end()));

dao.insert(wc);

return new KeyValue<>(k.key(),wc);
});
return kst.map((k,v) -> new KeyValue<>(k, v.getCount()));
};
}

如果DAO插入方法失败,则不应将消息发布到输出主题,并且应重试对同一消息的处理。

我们如何配置kafka流绑定器以执行此操作?非常感谢您提供任何帮助。

java apache-kafka-streams spring-cloud-stream event-driven-design spring-cloud-stream-binder-kafka
1个回答
0
投票

Spring Cloud Stream Kafka Streams活页夹本身在您的业务逻辑执行过程中未提供此类重试机制。但是,解决此用例的一种方法可能是将关键调用(在这种情况下为dao.insert())包装在本地定义的RetryTemplate中。这是一种可能的实现,其中以1秒的退避策略重试10次。如果您尝试使用此解决方案,请确保从您的主要业务逻辑中提取与RetryTemplate相关的通用代码。我没有尝试过,但是应该可以。

KStream<String, WordCount> kst =ktable.toStream().map((k,v) -> {
  WordCount wc = new WordCount();
  ...

  org.springframework.retry.support.RetryTemplate retryTemplate = new 
   RetryTemplate();

  RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
  FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
  backOffPolicy.setBackOffPeriod(1000);

  retryTemplate.setBackOffPolicy(backOffPolicy);
  retryTemplate.setRetryPolicy(retryPolicy);

  retryTemplate.execute(context -> {
    try {
      dao.insert(wc);
    }
    catch (Exception e) {
      throw new IllegalStateException(..);
   }
  });

  return new KeyValue<>(k.key(),wc);
});

重试dao插入操作10次后发生的事件,如果仍然失败,将引发异常,该异常将终止应用程序,在这种情况下,将不提交偏移量。重新启动后,解决了基本问题后,您的应用程序仍应从该偏移量继续。

© www.soinside.com 2019 - 2024. All rights reserved.