在我的 .net C# 项目(带有 Confluence Kafka 库)中,目前我正在使用以下代码从 Kafka 主题读取最新消息。但通过这段代码,我可以从定义的分区中读取最新的消息。但是 Kafka 服务器每次都会将我的主题的值写入不同的分区(我的 Kafka 主题配置为分区 0、1、2)。因此分区中的最后一条(最新)消息并不总是从数据源端发送到 Kafka 的最新消息。
如何调整我的代码以适应三个分区? Kafka Confluence 中有一个简单的函数吗?或者我是否必须每次从所有分区读取 Offset.End 消息,检查它们的时间戳,并确定哪一个是最新的?
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken cancellationToken = source.Token;
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe("My_Topic");
while (var_true)
{
TopicPartitionOffset tps = new TopicPartitionOffset(new TopicPartition("My_Topic", 1),Offset.End);
consumer.Assign(tps);
var consumeResult = consumer.Consume(cancellationToken);
Kafka_message_total = consumeResult.Message.Value;
// additional code to send the message value to an application
System.Threading.Thread.Sleep(2000);
}
consumer.Close();
}
当您禁用消费者组提交并设置 AutoOffsetReset=latest 时,对于所有分区,它将始终从主题末尾开始读取。同样,您可以随时将消费者查找到主题的末尾,或者计算结束偏移量并减一,然后在那里查找。
看到消息有时到达 P-0,有时到达 P-1,有时到达 P-3。我不知道kafka是如何决定何时将传入消息写入哪个分区
Kafka 文档解释了生产者如何根据哈希算法对记录键进行分区,或对空键进行循环