如何使用 C# 中的 Kafka Confluent Consumer 使用来自 Kafka 主题的最后一条(最新)消息?

问题描述 投票:0回答:2

我有以下代码用于从 Kafka 主题读取数据。我的目标是定期阅读主题中的最新消息,因为我想在实时图表中使用数据。我写了下面的代码。但是,如果我运行代码,我就会开始阅读过去某处(24 小时前)的内容。我想我必须在我的代码中定义类似偏移量的东西?我怎样才能在 Kafka Confluent 消费者中做到这一点?

public void Read_from_Kafka()
{
try
{
    var  config = new ConsumerConfig
    {
        BootstrapServers = kafka_URI,
        GroupId = "group",
        AutoOffsetReset = AutoOffsetReset.Earliest,
        SecurityProtocol = SecurityProtocol.Ssl,
        SslCaLocation = "path1",
        SslCertificateLocation = "path2",
        SslKeyLocation = "path3",
        SslKeyPassword = "password",                  

    };

    CancellationTokenSource source = new CancellationTokenSource();
    CancellationToken cancellationToken = source.Token;

    using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
    {
        consumer.Subscribe(topic_name);
        while (!cancellationToken.IsCancellationRequested)                    
        {
            var consumeResult = consumer.Consume(cancellationToken);
            Kafka_message_total = consumeResult.Message.Value;

            using (StreamWriter sw = File.AppendText(json_log_file))
            {
                sw.WriteLine("JSON: " + Kafka_message_total + " " + Convert.ToString(DateTime.Now));
            }                        
            System.Threading.Thread.Sleep(2000);
        }
        consumer.Close();
    }
    using (StreamWriter sw = File.AppendText(error_log))
    {
        sw.WriteLine("Stop Kafka " + " " + Convert.ToString(DateTime.Now));
    }
}

catch(Exception ex)
{

    using (StreamWriter sw = File.AppendText(error_log))
    {
        sw.WriteLine("Kafka Read Error: " + ex + " " + Convert.ToString(DateTime.Now));
    }
}

}

更新-1

我尝试设置 AutoOffsetReset = AutoOffsetReset.Latest 但我仍在读取过去的数据。我认为此设置不足以满足我的目的。

c# apache-kafka kafka-consumer-api confluent-kafka-dotnet
2个回答
0
投票

您需要在消费者上调用

Seek
函数以转到最高水位线,负 1。否则,从 Latest offset 开始的消费者将只会等待下一个产生的事件(可能永远不会发生)。

或者,使用不同的工具(例如 Kafka Connect)将数据写入 Elasticsearch、InfluxDB 等,您可以在其中配置图形工具(例如 Grafana)以显示来自这些系统的数据。


0
投票

我认为 Seek 方法在 .NET confluent-kafka-dotnet 包中效果不佳,建议使用 Assign 和 TopicPartitionOffset 代替。

如果您只有一个分区,分区 0,下面的代码会找到关于主题“购买”的最后一条消息:

using Confluent.Kafka;
using System;

class Consumer
{
    static void Main(string[] args)
    {
        ConsumerConfig config = new()
        {
            BootstrapServers = "localhost:9092",
            GroupId = Guid.NewGuid().ToString(),
        };
        using IConsumer<string, string> consumer 
            = new ConsumerBuilder<string, string>(config).Build();
        try
        {
            TopicPartition topicPartition = new("purchases", new Partition(0));
            WatermarkOffsets watermarkOffsets 
                = consumer.QueryWatermarkOffsets(topicPartition, TimeSpan.FromSeconds(3));
            TopicPartitionOffset topicPartitionOffset 
                = new(topicPartition, new Offset(watermarkOffsets.High.Value - 1));
            consumer.Assign(topicPartitionOffset);
            ConsumeResult<string, string> consumeResult 
                = consumer.Consume(TimeSpan.FromSeconds(3));
            Console.Write($"Last message value = {consumeResult.Message.Value}, " +
                $"position = {consumer.Position(topicPartition)}");
        }
        finally { consumer.Close(); }
    }
}

我在这里显然使用了超时而不是取消标记,但是使用取消标记不应该影响逻辑。

有一些警告说,在某些情况下,Kafka 不会以这种简单的方式使用偏移量,并且简单地从分区的高水位线偏移量中取出一个可能并不总是有效。在我测试过的示例中,它运行良好。

如果您有多个分区,则可以通过迭代和更改

new Partition(0)
到适当的索引来扩展代码以查找每个分区中的最后一条消息,例如分区 1 的
new Partition(1)
。对于多个分区,我不认为 Kafka 本身知道哪条是最后一条书面信息。除非在分区内,否则不能保证顺序。但是,一旦每个分区中都有最后一条消息,您可能会有一些消息属性可以用来解决这个问题(增加 ID、时间戳)。

© www.soinside.com 2019 - 2024. All rights reserved.