我有一个商业应用程序,其版本如下
我们有20个左右的批次,每个批次使用6-7个主题来处理业务,每个服务都有自己的状态存储,来维护批次的状态是否为runningIdle,使用下面的代码来查询这个存储。
@Autowired
private InteractiveQueryService interactiveQueryService;
public ReadOnlyKeyValueStore<String, String> fetchKeyValueStoreBy(String storeName) {
while (true) {
try {
log.info("Waiting for state store");
return new ReadOnlyKeyValueStoreWrapper<>(interactiveQueryService.getQueryableStore(storeName,
QueryableStoreTypes.<String, String> keyValueStore()));
} catch (final IllegalStateException e) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
在一个实例(Linux机器)中部署应用程序时,一切都很正常,而在两个实例中部署应用程序时,我们发现以下的观察结果。
状态存储在一个实例中可用,而另一个实例没有。
当请求被有状态存储的实例处理时,一切都很正常。
不知道我们缺少什么。
当你有多个Kafka Streams处理器和交互式查询运行时,你上面展示的代码将不会以你期望的方式工作。它只返回结果,如果你正在查询的键是在同一个服务器上。为了解决这个问题,你需要添加属性----。spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>
在每个实例上。确保每个服务器上的服务器和端口都改成正确的。然后你必须编写类似下面的代码。
org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
}