我正在迁移到
spring-kafka
3.1。我注意到的一件事是
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onSuccess(SendResult<String, Object> result) {
log.info("Sent message=[" + payload +
"] with offset=[" + result.getRecordMetadata()
.offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
log.info("Unable to send message=["
+ message + "] due to : " + ex.getMessage());
}
});
return future.get()
.getRecordMetadata()
.partition() + "-" + future.get()
.getRecordMetadata()
.offset();
ListenableFuture
在 Spring 6 中已弃用,替换为 CompletableFuture
。但是,没有使用 addCallBack
的 CompletableFuture
方法。为了向后兼容,修复上述回调的正确方法是什么?
提前声明我不了解Kafka,假设你有这个:
CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(message);
...那么您应该能够添加回调,如下所示:
future.handleAsync((result, ex) -> {
if (result != null) {
//success
log.info("Sent message=[" + payload +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
} else if (ex != null) {
//failure
log.info("Unable to send message=[" +
message + "] due to : " + ex.getMessage());
}
return //whatever you want to return to complete your future (or throw to complete exceptionally)
});
如果您想了解更多详细信息,这是 JDK 11 中该方法的 JavaDoc:
/**
* Returns a new CompletionStage that, when this stage completes
* either normally or exceptionally, is executed using this stage's
* default asynchronous execution facility, with this stage's
* result and exception as arguments to the supplied function.
*
* <p>When this stage is complete, the given function is invoked
* with the result (or {@code null} if none) and the exception (or
* {@code null} if none) of this stage as arguments, and the
* function's result is used to complete the returned stage.
*
* @param fn the function to use to compute the value of the
* returned CompletionStage
* @param <U> the function's return type
* @return the new CompletionStage
*/