使用Spring Cloud Stream Kafka应用程序,我们如何确保流监听器等待处理消息,直到完成某些依赖性任务(例如参考数据填充)?以下应用无法处理消息,因为消息传递得太早。我们如何在Spring Boot App中保证这种排序?
@Service
public class ApplicationStartupService implements ApplicationRunner {
private final FooReferenceDataService fooReferenceDataService;
@Override
public void run(ApplicationArguments args) throws Exception {
fooReferenceDataService.loadData();
}
}
@EnableBinding(MyBinding.class)
public class MyFooStreamProcessor {
@Autowired FooService fooService;
@StreamListener("my-input")
public void process(KStream<String, Foo> input) {
input.foreach((k,v)-> {
// !!! this fails to save
// messages are delivered too early before foo reference data got loaded into database
fooService.save(v);
});
}
}
我发现自2018年5月15日起,春季云流无法使用此功能。
Kafka - Delay binding until complex service initialisation has completed
支持此计划时,我们是否有计划/时间表?
与此同时,我通过使用@Ordered和ApplicationRunner实现了我想要的目标。它很乱,但很有效。基本上,流监听器将等待其他工作完成。
@Service
@Order(1)
public class ApplicationStartupService implements ApplicationRunner {
private final FooReferenceDataService fooReferenceDataService;
@Override
public void run(ApplicationArguments args) throws Exception {
fooReferenceDataService.loadData();
}
}
@EnableBinding(MyBinding.class)
@Order(2)
public class MyFooStreamProcessor implements ApplicationRunner {
@Autowired FooService fooService;
private final AtomicBoolean ready = new AtomicBoolean(false);
@StreamListener("my-input")
public void process(KStream<String, Foo> input) {
input.foreach((k,v)-> {
while (ready.get() == false) {
try {
log.info("sleeping for other dependent components to finish initialization");
Thread.sleep(10000);
} catch (InterruptedException e) {
log.info("woke up");
}
}
fooService.save(v);
});
}
@Override
public void run(ApplicationArguments args) throws Exception {
ready.set(true);
}
}