我们正在围绕 Kafka 构建一个小型包装器库。我们使用的是旧版本,在调用
.whenComplete((response, throwable) -> {...});
时没有 kafkaTemplate.send(...);
。
相反,我们的图书馆正在做以下事情:
1 定义一个
RetryTemplate
bean:
@Bean
public RetryTemplate prodRetryTemplate() {
final RetryTemplate template = RetryTemplate.builder().maxAttempts(3)
.retryOn(TopicAuthorizationException.class).exponentialBackoff(1000, 2, 2000).build();
return template;
}
2 定义异步 CompletableFuture 方法:
@Override
@Async(EXECUTOR_SERVICE) //we had defined Executor bean, it is ommited here
public CompletableFuture<DtoResponse> asyncSend(final Integer key, final String topic, final String message)
throws Exception {
final ProducerRecord<K, V> producerRecord = buildRecord(key, topic, message);
DtoRespons[] responseHolder = new DtoRespons[1];
template.execute(ctx ->{
final ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(producerRecord);
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onSuccess(final SendResult<Integer, String> result) {
log.info("success...");
responseHolder[0] = DtoResponse.builder()
.error(null)
.success(true)
.msg("PUBLISHED")
.build();
}
@SneakyThrows
@Override
public void onFailure(final Throwable ex) {
log.error("error...)
throwError(producerRecord, eventName, ex); //do additional logging
}
});
return null; //to satisfy execute method from RetryTemplate
});
return CompletableFuture.completedFuture(responseHolder[0]);
}
我的问题很简单,一旦
asyncSend
被调用它是否有可能返回 null
即行 return CompletableFuture.completedFuture(null);
在 onSuccess
或 onFailure
被调用之前执行?
即从客户端:
public void myService(){
MyLib lib = new MyLib();
CompletableFuture<DtoResponse> future = lib.sendAsync(key, topic, message);
future.whenComplete((result, throwable) -> {
//is it possible that result and throwable both be null ??
});
}
更新一:
基于教授的加里评论。我不确定这是不是你的意思:
@Override
@Async(EXECUTOR_SERVICE)
public CompletableFuture<SendResult<Integer, String>> asyncSend(final Integer key, final String topic, final String message)
throws Exception {
final ProducerRecord<Integer, String> producerRecord = buildRecord(key, topic, message); //this can throw ex
ListenableFuture<SendResult<Integer, String>> x = template.execute(ctx -> {
final ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(producerRecord);
return future;
});
CompletableFuture<SendResult<Integer, String>> completable = x.completable();
return completable;
}
然后客户端代码必须检查
throwable
和 SendResult<Integer, String>
在 .whenComplete()
内。但这不是我需要的。使用 SendResult<Integer, String>
客户端代码仍然无法判断消息是否已发送。如何规避这个?如果我以可以理解的方式解释了这一点,请告诉我。此外,我们的图书馆需要在 onFailure
和 onSuccess
中进行重要的登录
completedFuture
你正在返回一个已经完成的未来,不管回调是否已经运行;所以调用者总是会看到一个完整的未来。
你应该从执行方法返回
future
,然后从future.completable()
返回asyncSend
。
请注意,从 3.0 版开始,模板返回
CompletableFuture
而不是 ListenableFuture
。