消费者中的Kafka消息处理

问题描述 投票:0回答:2

我有一个使用者,它从一个主题读取数据并产生一个线程进行处理。在单个时间点,服务器中可能正在处理多个消息。应用程序遇到数据库超时,所有正在处理的消息都丢失了。并且由于有多个线程在轮询数据库连接,因此应用程序抛出了内存异常并关闭了。即使消费者在不进行处理的情况下瘫痪,如何改善体系结构以消除数据丢失

java apache-kafka kafka-consumer-api
2个回答
0
投票

您应在完成处理后通过提交偏移量来进行至少一次处理。即执行

  consumer.commitSync();

线程成功完成后。

请注意,您还需要将使用者配置为通过将'enable.auto.commit'设置为false来停止自动提交偏移量。

尽管您的消费者是幂等的,但您需要小心。即,如果失败,然后再次读取并处理相同的值,则不会影响结果。


0
投票

您应该在从数据库获得成功响应后提交偏移量。

问题与可用的数据库连接和线程有关。解决此问题的唯一方法是获取数据库连接,然后将数据库连接发送到线程。

线程示例

public class ConsumerThreadHandler implements Callable {

    private ConsumerRecord consumerRecord;
    private Connection dataBaseConnection;

    public ConsumerThreadHandler(ConsumerRecord consumerRecord,) {
        this.consumerRecord = consumerRecord;
        this.dataBaseConnection = dataBaseConnection;
    }

    @Override
    public Object call() throws Exception {
        // Perform all the data base related things
        // and generate the proper response
        return;
    }
}

消费代码

 executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy());

            while (true) {
              ConsumerRecords<String, String> records = consumer.poll(100);
              for (final ConsumerRecord record : records) {
                 // Get database connection , Check untill get the connection
                Future future=executor.submit(new ConsumerThreadHandler(record,dataBaseConnection));
                if(future.isDone())
                    // Based on the proper response commit the offset
              }
            }
          }
© www.soinside.com 2019 - 2024. All rights reserved.