在 Kafka 生产者/消费者中用 CompletableFuture 替换 ListenableFuture

问题描述 投票:0回答:1

我正在迁移到

spring-kafka
3.1。我注意到的一件事是

    ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(message);

    future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
        @Override
        public void onSuccess(SendResult<String, Object> result) {
            log.info("Sent message=[" + payload +
                    "] with offset=[" + result.getRecordMetadata()
                    .offset() + "]");
        }

        @Override
        public void onFailure(Throwable ex) {
            log.info("Unable to send message=["
                    + message + "] due to : " + ex.getMessage());
        }
    });
    return future.get()
            .getRecordMetadata()
            .partition() + "-" + future.get()
            .getRecordMetadata()
            .offset();

ListenableFuture
在 Spring 6 中已弃用,替换为
CompletableFuture
。但是,没有使用
addCallBack
CompletableFuture
方法。为了向后兼容,修复上述回调的正确方法是什么?

java spring spring-boot apache-kafka spring-kafka
1个回答
0
投票

提前声明我不了解Kafka,假设你有这个:

CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(message);

...那么您应该能够添加回调,如下所示:

future.handleAsync((result, ex) -> {
    if (result != null) {
        //success
        log.info("Sent message=[" + payload +
            "] with offset=[" + result.getRecordMetadata().offset() + "]");
    } else if (ex != null) {
        //failure
        log.info("Unable to send message=[" +
            message + "] due to : " + ex.getMessage());
    }
    return //whatever you want to return to complete your future (or throw to complete exceptionally)
});

如果您想了解更多详细信息,这是 JDK 11 中该方法的 JavaDoc:

 /**
 * Returns a new CompletionStage that, when this stage completes
 * either normally or exceptionally, is executed using this stage's
 * default asynchronous execution facility, with this stage's
 * result and exception as arguments to the supplied function.
 *
 * <p>When this stage is complete, the given function is invoked
 * with the result (or {@code null} if none) and the exception (or
 * {@code null} if none) of this stage as arguments, and the
 * function's result is used to complete the returned stage.
 *
 * @param fn the function to use to compute the value of the
 * returned CompletionStage
 * @param <U> the function's return type
 * @return the new CompletionStage
 */
© www.soinside.com 2019 - 2024. All rights reserved.