我有一个 Azure 函数,我想从主题上的服务总线订阅获取消息,然后将所有消息重新发送到另一个服务总线
但是当我尝试这样做时,我遇到了各种异常,并且它永远不会收到任何发送到其他服务总线的消息。
这是我在我的天蓝色函数中执行此操作的方法
public PartStatusFunction(ILogger<PartStatusFunction> log, IEnvironmentVariableLoader environmentVariableLoader)
{
_logger = log;
_environmentVariableLoader = environmentVariableLoader;
}
[FunctionName("PartStatusFunction")]
public async Task Run([ServiceBusTrigger("partstatus", "linktestest", Connection = "smartadmin.messagebus.subscription.name")] ServiceBusReceivedMessage message)
{
_logger.LogInformation($"C# ServiceBus topic trigger function processed message: {message}");
try
{
ServiceBusSender serviceBusSender = _environmentVariableLoader.GetServiceBusClient().CreateSender("partstatus");
ServiceBusMessage serviceMessage = new ServiceBusMessage(message);
await serviceBusSender.SendMessageAsync(serviceMessage);
}
catch (Exception e)
{
_logger.LogInformation($"Failed to send message to linktest azure bus: " + message.ToString() + " Exception " + e.ToString());
throw;
}
}
我创建了一个 EnvironmentVariableLoader 类,它是在我的启动类上的单例实例中创建的
public class Startup : FunctionsStartup
{
public override void Configure(IFunctionsHostBuilder builder)
{
builder.Services.AddSingleton<IEnvironmentVariableLoader, EnvironmentVariableLoader>();
}
}
EnvironmentVariableLoader 然后像这样创建一个 ServiceBusClient
private ServiceBusClient _serviceBusClient;
public EnvironmentVariableLoader()
{
_serviceBusClient = new ServiceBusClient("Endpoint=xxxxxxxxxxxx",
new ServiceBusClientOptions()
{
TransportType = ServiceBusTransportType.AmqpTcp,
});
}
public ServiceBusClient GetServiceBusClient()
{
return _serviceBusClient;
}
当我尝试在控制台应用程序中执行此操作时,它有效。
_serviceBusClientReceive = new ServiceBusClient("Endpoint=yyyyyyyyy",
new ServiceBusClientOptions()
{
TransportType = ServiceBusTransportType.AmqpTcp
});
_serviceBusClientSend = new ServiceBusClient("Endpoint=xxxxxxxx",
new ServiceBusClientOptions()
{
TransportType = ServiceBusTransportType.AmqpTcp
});
while (true)
{
try
{
var serviceBusReceiver = _serviceBusClientReceive.CreateReceiver("partstatus", "linktestest");
var receivedMessage= await serviceBusReceiver.ReceiveMessageAsync();
ServiceBusSender serviceBusSender = _serviceBusClientSend.CreateSender("partstatus");
ServiceBusMessage serviceMessage = new ServiceBusMessage(receivedMessage);
await serviceBusSender.SendMessageAsync(serviceMessage);
}
catch (Exception e)
{
}
}
是否无法在 Azure Function 中创建与另一个服务总线的连接?
如有任何帮助,我们将不胜感激。
如果您使用 Azure Functions,并且使用 Azure 服务总线输出绑定来补充触发器,则该解决方案可能比最初的建议简单得多。本质上,您的函数代码成为传入队列和传出队列之间的桥梁。
public class Functions
{
private readonly ILogger<Functions> logger;
public Functions(ILogger<Functions> logger)
{
this.logger = logger;
}
[ServiceBusAccount("ServiceBusConnection")]
[FunctionName(nameof(PartStatusFunction))]
[return: ServiceBus("%OutputQueue%")]
public Task<ServiceBusMessage> PartStatusFunction([ServiceBusTrigger("%InputQueue%")] ServiceBusReceivedMessage message)
{
logger.LogInformation("Sending message with ID {MessageId} to the destination queue", message.MessageId);
return Task.FromResult(new ServiceBusMessage(message));
}
}
配置需要以下值。
InputQueue
OutputQueue
ServiceBusConnection