我有一个应用程序,它有一个kafka消费者,并根据它收到的数据更新Elastic搜索。
我的问题是,每当ES宕机时,kafka消费者完全停止,并且没有重新启动。
我相信这是由我的ES代码运行方式造成的。
public CompletionStage<SearchResponse> executeSearch(SearchRequest searchRequest) {
CompletableFuture<SearchResponse> f = new CompletableFuture<>();
client.searchAsync(searchRequest, RequestOptions.DEFAULT, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
f.complete(searchResponse);
}
@Override
public void onFailure(Exception e) {
throw new Exception(); // I am guessing because of this
}
});
return f;
}
如果我改变了我的 onFailure
的方法。
public void onFailure(Exception e) {
f.complete(null);
}
它工作得很好,但我不明白为什么抛出一个异常会导致这样的结果。
任何帮助将是非常感激的。
对于那些需要解决方案的人,我把我的代码改成了下面的样子,使它能在异常情况下工作。
public void onFailure(Exception e) {
f.completeExceptionally(new Exception());
}
也有可能是相关的 exceptionally
方法和 handle
方法从 CompletableFuture
CompletableFuture.exceptionally((ex)-> )
- https:/docs.oracle.comjavase8docsapijavautilconcurrentCompletableFuture.html#exceptionally-java.util.function.Function-。