Kafka状态存储在分布式环境中不可用。

问题描述 投票:0回答:1

我有一个商业应用程序,其版本如下

  • spring boot(2.2.0.RELEASE) spring-Kafka(2.3.1-RELEASE) spring-cloud-stream-binder-kafka(2.2.1-RELEASE) spring-cloud-stream-binder-kafka-core(3.0.3-RELEASE) spring-cloud-stream-binder-kafka-streams(3.0.3-RELEASE)

我们有20个左右的批次,每个批次使用6-7个主题来处理业务,每个服务都有自己的状态存储,来维护批次的状态是否为runningIdle,使用下面的代码来查询这个存储。

@Autowired
private InteractiveQueryService interactiveQueryService;
      public ReadOnlyKeyValueStore<String, String> fetchKeyValueStoreBy(String storeName) {
        while (true) {
            try {
                log.info("Waiting for state store");
                return new ReadOnlyKeyValueStoreWrapper<>(interactiveQueryService.getQueryableStore(storeName,
                        QueryableStoreTypes.<String, String> keyValueStore()));
            } catch (final IllegalStateException e) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }

在一个实例(Linux机器)中部署应用程序时,一切都很正常,而在两个实例中部署应用程序时,我们发现以下的观察结果。

  1. 状态存储在一个实例中可用,而另一个实例没有。

  2. 当请求被有状态存储的实例处理时,一切都很正常。

  3. 如果请求落到没有状态存储的实例上,应用程序就会在while循环中无限期地等待(上面的代码片段)。
  4. 当没有存储的实例无限期等待时,如果我们杀死另一个实例,上面的代码就会返回存储,而且处理得很完美。

不知道我们缺少什么。

spring-boot spring-kafka spring-cloud-stream
1个回答
1
投票

当你有多个Kafka Streams处理器和交互式查询运行时,你上面展示的代码将不会以你期望的方式工作。它只返回结果,如果你正在查询的键是在同一个服务器上。为了解决这个问题,你需要添加属性----。spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port> 在每个实例上。确保每个服务器上的服务器和端口都改成正确的。然后你必须编写类似下面的代码。

org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
                        key, keySerializer);

if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {

    //query from the store that is locally available
}
else {
    //query from the remote host
}

请看 参考文件 以获取更多信息。这里有一个 示例代码 这表明,。

© www.soinside.com 2019 - 2024. All rights reserved.