如何从具有多个分区(在我的例子中是三个分区)的 Kafka 主题中始终读取最新(最后)消息?

问题描述 投票:0回答:1

在我的 .net C# 项目(带有 Confluence Kafka 库)中,目前我正在使用以下代码从 Kafka 主题读取最新消息。但通过这段代码,我可以从定义的分区中读取最新的消息。但是 Kafka 服务器每次都会将我的主题的值写入不同的分区(我的 Kafka 主题配置为分区 0、1、2)。因此分区中的最后一条(最新)消息并不总是从数据源端发送到 Kafka 的最新消息。

如何调整我的代码以适应三个分区? Kafka Confluence 中有一个简单的函数吗?或者我是否必须每次从所有分区读取 Offset.End 消息,检查它们的时间戳,并确定哪一个是最新的?

        CancellationTokenSource source = new CancellationTokenSource();
        CancellationToken cancellationToken = source.Token;
        using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
        {
            consumer.Subscribe("My_Topic");               

            while (var_true)
            {
                TopicPartitionOffset tps = new TopicPartitionOffset(new TopicPartition("My_Topic", 1),Offset.End);
                consumer.Assign(tps);
                var consumeResult = consumer.Consume(cancellationToken);                      
                
                Kafka_message_total = consumeResult.Message.Value;

                // additional code to send the message value to an application

                System.Threading.Thread.Sleep(2000);

            }

            consumer.Close();
        }
c# apache-kafka kafka-consumer-api confluent-kafka-dotnet
1个回答
0
投票

当您禁用消费者组提交并设置 AutoOffsetReset=latest 时,对于所有分区,它将始终从主题末尾开始读取。同样,您可以随时将消费者查找到主题的末尾,或者计算结束偏移量并减一,然后在那里查找。

看到消息有时到达 P-0,有时到达 P-1,有时到达 P-3。我不知道kafka是如何决定何时将传入消息写入哪个分区

Kafka 文档解释了生产者如何根据哈希算法对记录键进行分区,或对空键进行循环

© www.soinside.com 2019 - 2024. All rights reserved.