嵌入式Kafka迁移状态存储在具有不同@StreamListener的两个实例之间

问题描述 投票:0回答:1

我有一个SpringBoot应用程序,它有两个通过Spring Cloud映射的流处理器。每个处理器都有自己的@StreamListener用于不同的主题。一个处理器将聚合数据写入可状态存储。当我通过@Service(服务从状态存储获取聚合数据)获取数据时,我在单元测试中面临问题。由于某种原因不时捕获异常:

org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, recently-played-store, may have migrated to another instance. at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60) at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1043) at org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry.getQueryableStoreType(QueryableStoreRegistry.java:47)

当我从另一个处理器中删除StreamListener时,所有工作都很好并且稳定。

如何使用适当的处理器绑定精确实例的状态存储?

apache-kafka spring-cloud apache-kafka-streams spring-kafka embedded-kafka
1个回答
0
投票

我找到了问题的解决方案,也许它会帮助别人。我没有使用spring-cloud-stream-binder-kafka-streams的最新版本。我的版本是2.0.0,它有一个bug https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/366。它仅在2.0.1版本中修复

© www.soinside.com 2019 - 2024. All rights reserved.