我想获取那些没有被发送到Kafka的Person对象,即onFailure()方法。
1)我创建了person类型的temp数组,并将其传递给onFailure().但这并不工作,它总是显示相同的temp[0]doest工作,它总是打印列表中的最后一个值。
"在封闭作用域中定义的局部变量pp必须是最终的或有效的最终变量"
我如何解决这个问题?
class Sender {
@Autowired
private KafkaTemplate<String, Person> template;
private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
Person temp= null;
// @Transactional("ktm")
public void sendThem(List<Person> toSend) throws InterruptedException {
List<ListenableFuture<SendResult<String, Person>>> futures = new ArrayList<>();
ListenableFutureCallback<SendResult<String, Person>> callback = new ListenableFutureCallback<SendResult<String, Person>>() {
@Override
public void onSuccess(SendResult<String, Person> result) {
//LOG.info(" message success 1: " + result.getProducerRecord().value());
LOG.info(" message success 2: " + temp);
}
@Override
public void onFailure(Throwable ex) {
LOG.info(" message failed : " + temp);
}
};
for (Person p : toSend) {
temp=p;
ListenableFuture<SendResult<String, Person>> future = template.send("t_101", temp);
future.addCallback(callback);
}
}
}
失败的发送值可以在 Throwable
- 抛给 KafkaProducerException
. 您的 Person
是在 producerRecord.value()
.
我更新了我的回答 你的另一个问题.
你也可以在循环中创建一个新的回调,然后在循环中创建一个新的回调。p
将会对你有用;但更有效的做法是使用一个回调并获得 Person
从异常。