C# 中的 Kafka 生产者作为 Windows 服务

问题描述 投票:0回答:0

我使用 Confluent 库创建了一个 Kafka 生产者作为控制台应用程序,它作为计划任务运行,定期向 Kafka 代理提交一些错误记录。工作得很好。

我必须将其转换为作为 Windows 服务运行,并且我遇到了一些处理调用异步性质的问题,使用“等待”。

program.cs 包含基本代码:

static void Main()
{
    /* Uncomment for debugging */
    //Debugger.Launch();

    ServiceBase[] ServicesToRun;
    ServicesToRun = new ServiceBase[]
    {
        new Kafka_Producer()
    };
    ServiceBase.Run(ServicesToRun);
}

包含服务 API 的文件:

public Kafka_Producer()
{
    InitializeComponent();
}

protected override void OnStart(string[] args)
{
    // Some initialization, setting global variables
    ...
    producer = new Producer(sBootstrapServer);

    // This line shows message: "Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the await operator to the result of the call"
    CheckDB();   

    // Set up a timer that triggers periodically.
    timer = new System.Timers.Timer();
    timer.Interval = iRunInterval;
    timer.Elapsed += new ElapsedEventHandler(this.OnTimer);
    timer.Start();
}

protected async Task CheckDB()
{
    List<string> lsJsonResult = ApplicationException.GetErrorsJSON();
    await producer.SendErrorData(sTopicName, lsJsonResult);
}

SendErrorData 正在向 Kafka 发送数据:

public async Task SendErrorData(string topicName, List<string> jsonResult)
{
    string logMessage = string.Empty;

    using (var producer = new ProducerBuilder<Null, string>(_producerConfig)
        .SetValueSerializer(Serializers.Utf8)
        .SetLogHandler((_, message) => LogWriter.LogWrite($"Facility: {message.Facility}-{message.Level} Message: {message.Message}"))
        .SetErrorHandler((_, e) => LogWriter.LogWrite($"Error: {e.Reason}. Is Fatal: {e.IsFatal}"))
        .Build())
        try
        {                    
            foreach(string sJsonErrData in jsonResult)
            {
                dynamic results = JsonConvert.DeserializeObject<dynamic>(sJsonErrData);
                int id = results.uniqueId;
                    
                var deliveryReport = await producer.ProduceAsync(topicName,
                    new Message<Null, string>
                    {
                        Value = sJsonErrData
                    });
                ....
            }
        }
        catch()
        {}
    }
         

我做错了什么?

c# windows-services confluent-kafka-dotnet
© www.soinside.com 2019 - 2024. All rights reserved.