CompletableFuture 与 Kafka 的回调方法?

问题描述 投票:0回答:1

我们正在围绕 Kafka 构建一个小型包装器库。我们使用的是旧版本,在调用

.whenComplete((response, throwable) -> {...});
时没有
kafkaTemplate.send(...);

相反,我们的图书馆正在做以下事情:

1 定义一个

RetryTemplate
bean:

@Bean
public RetryTemplate prodRetryTemplate() {    
   final RetryTemplate template = RetryTemplate.builder().maxAttempts(3)
         .retryOn(TopicAuthorizationException.class).exponentialBackoff(1000, 2, 2000).build();
   return template;
}

2 定义异步 CompletableFuture 方法:

@Override
@Async(EXECUTOR_SERVICE) //we had defined Executor bean, it is ommited here
public CompletableFuture<DtoResponse> asyncSend(final Integer key, final String topic, final String message)
      throws Exception {

   final ProducerRecord<K, V> producerRecord = buildRecord(key, topic, message);

   DtoRespons[] responseHolder = new DtoRespons[1];
   template.execute(ctx ->{

            final ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(producerRecord);
            future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {

               @Override
               public void onSuccess(final SendResult<Integer, String> result) {
                  log.info("success...");
                  responseHolder[0] = DtoResponse.builder()
                        .error(null)
                        .success(true)
                        .msg("PUBLISHED")
                        .build();
               }

               @SneakyThrows
               @Override
               public void onFailure(final Throwable ex) {
                  log.error("error...)
                  throwError(producerRecord, eventName, ex); //do additional logging
               }
            });

      return null; //to satisfy execute method from RetryTemplate
   });

   return CompletableFuture.completedFuture(responseHolder[0]);
}

我的问题很简单,一旦

asyncSend
被调用它是否有可能返回
null
即行
return CompletableFuture.completedFuture(null);
onSuccess
onFailure
被调用之前执行?

即从客户端:

public void myService(){  
  MyLib lib = new MyLib();
  CompletableFuture<DtoResponse> future = lib.sendAsync(key, topic, message);
  future.whenComplete((result, throwable) -> { 
     //is it possible that result and throwable both be null ??

    });
}

更新一:

基于教授的加里评论。我不确定这是不是你的意思:

@Override
@Async(EXECUTOR_SERVICE)
public CompletableFuture<SendResult<Integer, String>> asyncSend(final Integer key, final String topic, final String message)
      throws Exception {
   final ProducerRecord<Integer, String> producerRecord = buildRecord(key, topic, message); //this can throw ex

   ListenableFuture<SendResult<Integer, String>> x = template.execute(ctx -> {
      final ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(producerRecord);

      return future;
   });
   CompletableFuture<SendResult<Integer, String>> completable = x.completable();

   return completable;
}

然后客户端代码必须检查

throwable
SendResult<Integer, String>
.whenComplete()
内。但这不是我需要的。使用
SendResult<Integer, String>
客户端代码仍然无法判断消息是否已发送。如何规避这个?如果我以可以理解的方式解释了这一点,请告诉我。此外,我们的图书馆需要在
onFailure
onSuccess

中进行重要的登录
java spring-boot spring-kafka completable-future
1个回答
1
投票

completedFuture

你正在返回一个已经完成的未来,不管回调是否已经运行;所以调用者总是会看到一个完整的未来。

你应该从执行方法返回

future
,然后从
future.completable()
返回
asyncSend

请注意,从 3.0 版开始,模板返回

CompletableFuture
而不是
ListenableFuture

© www.soinside.com 2019 - 2024. All rights reserved.