我有一个SpringBoot应用程序,它有两个通过Spring Cloud映射的流处理器。每个处理器都有自己的@StreamListener用于不同的主题。一个处理器将聚合数据写入可状态存储。当我通过@Service(服务从状态存储获取聚合数据)获取数据时,我在单元测试中面临问题。由于某种原因不时捕获异常:
org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, recently-played-store, may have migrated to another instance.
at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60)
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1043)
at org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry.getQueryableStoreType(QueryableStoreRegistry.java:47)
当我从另一个处理器中删除StreamListener时,所有工作都很好并且稳定。
如何使用适当的处理器绑定精确实例的状态存储?
我找到了问题的解决方案,也许它会帮助别人。我没有使用spring-cloud-stream-binder-kafka-streams的最新版本。我的版本是2.0.0,它有一个bug https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/366。它仅在2.0.1版本中修复