如果处理失败,再次使用同一条消息

问题描述 投票:3回答:1

我正在使用Confluent.Kafka .NET客户端版本1.3.0。我正在关注docs

var consumerConfig = new ConsumerConfig
{
    BootstrapServers = "server1, server2",
    AutoOffsetReset = AutoOffsetReset.Earliest,
    EnableAutoCommit = true,
    EnableAutoOffsetStore = false,
    GroupId = this.groupId,
    SecurityProtocol = SecurityProtocol.SaslPlaintext,
    SaslMechanism = SaslMechanism.Plain,
    SaslUsername = this.kafkaUsername,
    SaslPassword = this.kafkaPassword,
};

using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
    var cancellationToken = new CancellationTokenSource();
    Console.CancelKeyPress += (_, e) =>
    {
        e.Cancel = true;
        cancellationToken.Cancel();
    };

    consumer.Subscribe("my-topic");
    while (true)
    {
        try
        {
            var consumerResult = consumer.Consume();
            // process message
            consumer.StoreOffset(consumerResult);
        }
        catch (ConsumeException e)
        {
            // log
        }
        catch (KafkaException e)
        {
            // log
        }
        catch (OperationCanceledException e)
        {
            // log
        }
    }
}

问题是,即使我注释掉consumer.StoreOffset(consumerResult);行,下次我Consume时,我仍会收到下一条未使用的消息,即偏移量一直在增加,这似乎不是文档所声称的就是这样,即至少一次交付

即使设置EnableAutoCommit = false并从配置中删除'EnableAutoOffsetStore = false',然后将consumer.StoreOffset(consumerResult)替换为consumer.Commit(),我仍然会看到相同的行为,即即使我注释掉Commit,我仍然继续获取下一条未使用的消息。

我觉得我在这里缺少基本的东西,但无法弄清楚。任何帮助表示赞赏!

c# apache-kafka kafka-consumer-api
1个回答
0
投票

抱歉,我还不能添加评论。Kafka使用者分批使用消息,so maybe you still iterate through the batch pre-fetched by background thread

您可以使用kafka util kafka-consumer-groups.sh检查您的使用者是否确实提交了偏移量>

kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group consumer_group  --describe
© www.soinside.com 2019 - 2024. All rights reserved.