我正在听活动中心的各种活动。
对于每个事件,我需要对不是非常可靠的系统进行3-4个API调用。而且由于其中一些是跨地理电话,因此可能需要一些时间。
我计划从事件中心接收事件,并将其放入Service Bus。我的原因如下。
我一直在寻找这样的模式,但是我没有看到过这样的模式:我们从基于日志的消息传递系统中获取数据并将其推送到基于队列的系统。
是否有更好的方法来处理这种情况?
我认为您可以使用事件中心触发器和服务总线输出绑定来实现您想要的。
例如,我想监视事件中心“测试”,而我正在使用C#库:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
namespace FunctionApp68
{
public static class Function1
{
[FunctionName("Function1")]
[return: ServiceBus("test1", Connection = "ServiceBusConnection")]
public static string Run([EventHubTrigger("test", Connection = "str")] EventData[] events, ILogger log)
{
var exceptions = new List<Exception>();
string messageBodyt = "";
foreach (EventData eventData in events)
{
try
{
string messageBody = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
messageBodyt = messageBodyt + messageBody;
// Replace these two lines with your processing logic.
log.LogInformation($"C# Event Hub trigger function processed a message: {messageBody}");
//await Task.Yield();
}
catch (Exception e)
{
// We need to keep processing the rest of the batch - capture this exception and continue.
// Also, consider capturing details of the message that failed processing so it can be processed again later.
exceptions.Add(e);
}
}
// Once processing of the batch is complete, if any messages in the batch failed processing throw an exception so that there is a record of the failure.
if (exceptions.Count > 1)
throw new AggregateException(exceptions);
if (exceptions.Count == 1)
throw exceptions.Single();
return messageBodyt;
}
}
}
上面的代码将从事件中心'test'收集并保存到服务总线队列'test1'。
看看这些文档: