Confluence Kafka Consumer.Consume() 无法捕获任何异常(无效凭据)

问题描述 投票:0回答:1
using System;
using System.Collections.Generic;
using System.Net;
using System.Security.Authentication;
using System.Threading.Tasks;
using Confluent.Kafka;

class program
{
    static async Task Main(string[] args)
    {
        try
        {
            bool flag = false;
            var config = new ConsumerConfig
            {
                BootstrapServers = "pkc-6ojv2.us-west4.gcp.confluent.cloud:9092",
                SecurityProtocol = SecurityProtocol.SaslSsl,
                SaslMechanism = SaslMechanism.Plain,
                SaslUsername = "NUSJ4dsfdfsdKO6A6JA6",
                SaslPassword = "7gSgj1AXyIj/TYuL5v6WWdr/MfpG2Mhxrnzy9XRN8+jvk1/8LpB/A82CHUOW6L1V",
                GroupId = "test",
                AutoOffsetReset = AutoOffsetReset.Latest,
                EnableAutoCommit = false
            };
            while (!flag)
            {
                try
                {
            
                    using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
                    {
                        var message = consumer.Consume();
                        Console.WriteLine($"Thread {Task.CurrentId} received message: {message.Value}");
                        consumer.Commit(message);
                    }
                }
                catch (ConsumeException e)
                {
                    Console.WriteLine($"Error occurred: {e.Error.Reason}");
                    flag = true;
                }
                catch (WebException ex)
                {
                    Console.WriteLine($"Error occurred: {ex.Message}");
                    flag = true;
                }
                catch (AuthenticationException ex)
                {
                    Console.WriteLine($"Error occurred: {ex.Message}");
                    flag = true;
                }
                catch (OperationCanceledException ex)
                {
                    Console.WriteLine($"Error occurred: {ex.Message}");
                    flag = true;
                }
                catch (Exception e)
                {
                    Console.WriteLine($"Error occurred: {e.Message}");
                    flag = true;
                }
            }
        }
        catch (Exception ex)
        {
            throw new Exception(ex.Message);
        }
        Console.ReadLine();
    }
}

当凭据无效时,此代码不会进入任何 catch 块。

Consumer.Consume() 不会抛出异常并被任何 catch 块捕获。如果有的话,请帮助我提供更好的异常处理方法。 我在 .Net Framework 4.8 上使用 Confluence.Kafka 库。 谢谢

c# apache-kafka kafka-consumer-api confluent-kafka-dotnet
1个回答
0
投票

我相信正在发生的是 Consumer.Consume 将尝试从所有非致命错误中恢复。要收到这些错误的通知,您必须在 ConsumerBuilder 上调用 SetErrorHandler 因此在这种情况下

using (var consumer = new ConsumerBuilder<Ignore, string>(_config).SetErrorHandler(ErrorHandler).Build())

ErrorHandler 在哪里

private void ErrorHandler(IConsumer<Ignore, string> consumer, Error error)

参见 SetErrorHandler 的 Confluence 文档

© www.soinside.com 2019 - 2024. All rights reserved.