Spring Kafka 使用 AckMode.RECORD 手动确认

问题描述 投票:0回答:1

我正在尝试将 Kafka 用于我拥有的以下用例。

我们的生产者将向 kafka 主题生成消息。我们的消费者本质上很慢,因为每条消息的处理都会根据消息的内容而有所不同。我们还根据某些条件对传入消息进行批处理,并一次性处理批处理消息列表。我们尝试了以下两种方法。

  1. 多个分区:通过多个分区,我们可以使用同一个消费者线程来批处理和处理消息,该线程工作正常,但是当批处理中有一些较小的消息集并且不满足处理批处理的条件时,我们正在等待新消息来处理已经批处理的现有消息。我们尝试了超时,但由于消费者线程在收到消息之前不会执行,因此我们永远无法处理最后一块消息。

  2. 一个分区 - 在内存队列中:我们尝试使用单个分区,它将所有传入消息写入内存并发队列,并且我们使用 ExecutorService 生成 5 个线程(比方说)。所有线程都能够从队列中读取唯一的消息并处理它们。但问题在于确认。我们将 ack 与消息一起保存在队列中,并尝试在处理后确认每条消息。但是我们观察到的是在内部队列进行部分处理之后,如果消费者重新启动,我看不到其他消息返回消费者。我们猜测这是因为我们正在做的确认。这里的 Ack 模式 id MANUAL/MANUAL_IMMEDIATE

代码如下:

@KafkaListener(topics = "#{'${kafka-consumer.topics}'.split(',')}", groupId = "${kafka-consumer.groupId}", concurrency = "1")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) throws InterruptedException {
        JSONObject jsonObject = new JSONObject(record.value());
        JSONArray request_data = jsonObject.getJSONArray("request");
        JSONObject data = request_data.getJSONObject(0);
        MessagePair messagePair = new MessagePair();
        messagePair.setMessage(data);
        messagePair.setAcknowledgment(ack);
        linkedQueue.add(messagePair);
        System.out.println("Queue size is : " + linkedQueue.size());

        if (executorService == null) {
            System.out.println("Starting executor service");
            executorService = Executors.newFixedThreadPool(2);
            for (int i = 0; i < 2; i++) {
                QueeuTask queeuTask = new QueeuTask(linkedQueue);
                System.out.println();
                executorService.execute(queeuTask);
            }
        }
}

下面的 printList 方法将由每个执行器线程在根据自定义条件创建批处理后调用。我们的实际处理将在下面的方法中进行,处理后我们将尝试确认每条消息。

public void Printlist(ArrayList<MessagePair> lst, String threadName) throws InterruptedException {
        //Process the messages
        for(MessagePair pair : lst) {
            System.out.println("Sending acknowledgement for message : " + pair.getMessage());
            pair.getAcknowledgment().acknowledge();
        }
        lst.clear();
}

通过上述实现,我们确实看到了本地队列中未处理的丢失消息。

有人可以帮助我们如何在每条消息中实现此确认吗?我们尝试过 AckMode.RECORD ,上面写着

没有可用的确认作为参数,侦听器容器必须具有手动 AckMode 来填充确认

感谢您的帮助。

apache-kafka kafka-consumer-api spring-kafka
1个回答
0
投票

分析用例后,我们改变了设计,以确保kafka消费者不会慢。我们还处理偏移量提交以确保处理完成。在新的用例中,我们使用 Kafka 批处理侦听器在一次轮询中读取固定消息,并创建与实际进程相同的负载,并使用 Kafka 侦听器来处理这些处理强度不是很大的负载。谢谢大家的建议。

© www.soinside.com 2019 - 2024. All rights reserved.