我正在使用非阻塞式(异步)发送消息到Kafka:
ListenableFuture<SendResult<Integer, String>> future = template.send(record);
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
handleSuccess(data);
}
@Override
public void onFailure(Throwable ex) {
handleFailure(data, record, ex);
}
});
这在send操作完成时非常有效。
但是,如果出现连接问题(例如,服务器关闭),结果成为非异步,并且该方法将一直处于阻塞状态,直到max.block.ms持续时间结束。
这在异步KAfka生产者中很自然。您有两个选择