我有以下代码用于从 Kafka 主题读取数据。我的目标是定期阅读主题中的最新消息,因为我想在实时图表中使用数据。我写了下面的代码。但是,如果我运行代码,我就会开始阅读过去某处(24 小时前)的内容。我想我必须在我的代码中定义类似偏移量的东西?我怎样才能在 Kafka Confluent 消费者中做到这一点?
public void Read_from_Kafka()
{
try
{
var config = new ConsumerConfig
{
BootstrapServers = kafka_URI,
GroupId = "group",
AutoOffsetReset = AutoOffsetReset.Earliest,
SecurityProtocol = SecurityProtocol.Ssl,
SslCaLocation = "path1",
SslCertificateLocation = "path2",
SslKeyLocation = "path3",
SslKeyPassword = "password",
};
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken cancellationToken = source.Token;
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe(topic_name);
while (!cancellationToken.IsCancellationRequested)
{
var consumeResult = consumer.Consume(cancellationToken);
Kafka_message_total = consumeResult.Message.Value;
using (StreamWriter sw = File.AppendText(json_log_file))
{
sw.WriteLine("JSON: " + Kafka_message_total + " " + Convert.ToString(DateTime.Now));
}
System.Threading.Thread.Sleep(2000);
}
consumer.Close();
}
using (StreamWriter sw = File.AppendText(error_log))
{
sw.WriteLine("Stop Kafka " + " " + Convert.ToString(DateTime.Now));
}
}
catch(Exception ex)
{
using (StreamWriter sw = File.AppendText(error_log))
{
sw.WriteLine("Kafka Read Error: " + ex + " " + Convert.ToString(DateTime.Now));
}
}
}
更新-1
我尝试设置 AutoOffsetReset = AutoOffsetReset.Latest 但我仍在读取过去的数据。我认为此设置不足以满足我的目的。
您需要在消费者上调用
Seek
函数以转到最高水位线,负 1。否则,从 Latest offset 开始的消费者将只会等待下一个产生的事件(可能永远不会发生)。
或者,使用不同的工具(例如 Kafka Connect)将数据写入 Elasticsearch、InfluxDB 等,您可以在其中配置图形工具(例如 Grafana)以显示来自这些系统的数据。
我认为 Seek 方法在 .NET confluent-kafka-dotnet 包中效果不佳,建议使用 Assign 和 TopicPartitionOffset 代替。
如果您只有一个分区,分区 0,下面的代码会找到关于主题“购买”的最后一条消息:
using Confluent.Kafka;
using System;
class Consumer
{
static void Main(string[] args)
{
ConsumerConfig config = new()
{
BootstrapServers = "localhost:9092",
GroupId = Guid.NewGuid().ToString(),
};
using IConsumer<string, string> consumer
= new ConsumerBuilder<string, string>(config).Build();
try
{
TopicPartition topicPartition = new("purchases", new Partition(0));
WatermarkOffsets watermarkOffsets
= consumer.QueryWatermarkOffsets(topicPartition, TimeSpan.FromSeconds(3));
TopicPartitionOffset topicPartitionOffset
= new(topicPartition, new Offset(watermarkOffsets.High.Value - 1));
consumer.Assign(topicPartitionOffset);
ConsumeResult<string, string> consumeResult
= consumer.Consume(TimeSpan.FromSeconds(3));
Console.Write($"Last message value = {consumeResult.Message.Value}, " +
$"position = {consumer.Position(topicPartition)}");
}
finally { consumer.Close(); }
}
}
我在这里显然使用了超时而不是取消标记,但是使用取消标记不应该影响逻辑。
有一些警告说,在某些情况下,Kafka 不会以这种简单的方式使用偏移量,并且简单地从分区的高水位线偏移量中取出一个可能并不总是有效。在我测试过的示例中,它运行良好。
如果您有多个分区,则可以通过迭代和更改
new Partition(0)
到适当的索引来扩展代码以查找每个分区中的最后一条消息,例如分区 1 的new Partition(1)
。对于多个分区,我不认为 Kafka 本身知道哪条是最后一条书面信息。除非在分区内,否则不能保证顺序。但是,一旦每个分区中都有最后一条消息,您可能会有一些消息属性可以用来解决这个问题(增加 ID、时间戳)。