我使用 Confluent 库创建了一个 Kafka 生产者作为控制台应用程序,它作为计划任务运行,定期向 Kafka 代理提交一些错误记录。工作得很好。
我必须将其转换为作为 Windows 服务运行,并且我遇到了一些处理调用异步性质的问题,使用“等待”。
program.cs 包含基本代码:
static void Main()
{
/* Uncomment for debugging */
//Debugger.Launch();
ServiceBase[] ServicesToRun;
ServicesToRun = new ServiceBase[]
{
new Kafka_Producer()
};
ServiceBase.Run(ServicesToRun);
}
包含服务 API 的文件:
public Kafka_Producer()
{
InitializeComponent();
}
protected override void OnStart(string[] args)
{
// Some initialization, setting global variables
...
producer = new Producer(sBootstrapServer);
// This line shows message: "Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the await operator to the result of the call"
CheckDB();
// Set up a timer that triggers periodically.
timer = new System.Timers.Timer();
timer.Interval = iRunInterval;
timer.Elapsed += new ElapsedEventHandler(this.OnTimer);
timer.Start();
}
protected async Task CheckDB()
{
List<string> lsJsonResult = ApplicationException.GetErrorsJSON();
await producer.SendErrorData(sTopicName, lsJsonResult);
}
SendErrorData 正在向 Kafka 发送数据:
public async Task SendErrorData(string topicName, List<string> jsonResult)
{
string logMessage = string.Empty;
using (var producer = new ProducerBuilder<Null, string>(_producerConfig)
.SetValueSerializer(Serializers.Utf8)
.SetLogHandler((_, message) => LogWriter.LogWrite($"Facility: {message.Facility}-{message.Level} Message: {message.Message}"))
.SetErrorHandler((_, e) => LogWriter.LogWrite($"Error: {e.Reason}. Is Fatal: {e.IsFatal}"))
.Build())
try
{
foreach(string sJsonErrData in jsonResult)
{
dynamic results = JsonConvert.DeserializeObject<dynamic>(sJsonErrData);
int id = results.uniqueId;
var deliveryReport = await producer.ProduceAsync(topicName,
new Message<Null, string>
{
Value = sJsonErrData
});
....
}
}
catch()
{}
}
我做错了什么?