在Kafka Send()方法的Onfailure()方法调用中获取Objects值。

问题描述 投票:0回答:1

我想获取那些没有被发送到Kafka的Person对象,即onFailure()方法。

1)我创建了person类型的temp数组,并将其传递给onFailure().但这并不工作,它总是显示相同的temp[0]doest工作,它总是打印列表中的最后一个值。

"在封闭作用域中定义的局部变量pp必须是最终的或有效的最终变量"

我如何解决这个问题?

class Sender {

        @Autowired
        private KafkaTemplate<String, Person> template;

        private static final Logger LOG = LoggerFactory.getLogger(Sender.class);



        Person temp= null;

        // @Transactional("ktm")
        public void sendThem(List<Person> toSend) throws InterruptedException {
            List<ListenableFuture<SendResult<String, Person>>> futures = new ArrayList<>();


            ListenableFutureCallback<SendResult<String, Person>> callback = new ListenableFutureCallback<SendResult<String, Person>>() {

                @Override
                public void onSuccess(SendResult<String, Person> result) {
                    //LOG.info(" message success 1: " + result.getProducerRecord().value());
                    LOG.info(" message success 2: " + temp);
                }

                @Override
                public void onFailure(Throwable ex) {
                    LOG.info(" message failed : "  + temp);

                }
            };

            for (Person p : toSend) {

                temp=p;
                ListenableFuture<SendResult<String, Person>> future = template.send("t_101", temp);


                future.addCallback(callback);
            }



        }
    }
java apache-kafka spring-kafka kafka-producer-api
1个回答
1
投票

失败的发送值可以在 Throwable - 抛给 KafkaProducerException. 您的 Person 是在 producerRecord.value().

我更新了我的回答 你的另一个问题.

你也可以在循环中创建一个新的回调,然后在循环中创建一个新的回调。p 将会对你有用;但更有效的做法是使用一个回调并获得 Person 从异常。

© www.soinside.com 2019 - 2024. All rights reserved.