如何确保Spring Cloud Stream Listener等待处理消息,直到Application在Start上完全初始化?

问题描述 投票:0回答:1

使用Spring Cloud Stream Kafka应用程序,我们如何确保流监听器等待处理消息,直到完成某些依赖性任务(例如参考数据填充)?以下应用无法处理消息,因为消息传递得太早。我们如何在Spring Boot App中保证这种排序?

@Service
public class ApplicationStartupService implements ApplicationRunner {

  private final FooReferenceDataService fooReferenceDataService;

  @Override
  public void run(ApplicationArguments args) throws Exception {
      fooReferenceDataService.loadData();
  }
}

@EnableBinding(MyBinding.class)
public class MyFooStreamProcessor {

  @Autowired FooService fooService;

  @StreamListener("my-input")
  public void process(KStream<String, Foo> input) {
      input.foreach((k,v)-> {
          // !!! this fails to save
          // messages are delivered too early before foo reference data got loaded into database
          fooService.save(v);
      });         
  }
}
  • spring-cloud-stream:2.1.0.RELEASE
  • spring-boot:2.1.2.RELEASE

我发现自2018年5月15日起,春季云流无法使用此功能。

Kafka - Delay binding until complex service initialisation has completed

支持此计划时,我们是否有计划/时间表?

spring-boot spring-cloud-stream spring-kafka
1个回答
0
投票

与此同时,我通过使用@Ordered和ApplicationRunner实现了我想要的目标。它很乱,但很有效。基本上,流监听器将等待其他工作完成。

@Service
@Order(1)
public class ApplicationStartupService implements ApplicationRunner {

  private final FooReferenceDataService fooReferenceDataService;

  @Override
  public void run(ApplicationArguments args) throws Exception {
      fooReferenceDataService.loadData();
  }
}

@EnableBinding(MyBinding.class)
@Order(2)
public class MyFooStreamProcessor implements ApplicationRunner {

  @Autowired FooService fooService;
  private final AtomicBoolean ready = new AtomicBoolean(false);

  @StreamListener("my-input")
  public void process(KStream<String, Foo> input) {
      input.foreach((k,v)-> {
          while (ready.get() == false) {
            try {
              log.info("sleeping for other dependent components to finish initialization");
              Thread.sleep(10000);
            } catch (InterruptedException e) {
              log.info("woke up");
            }
          }
          fooService.save(v);
      });         
  }

  @Override
  public void run(ApplicationArguments args) throws Exception {
    ready.set(true);
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.