从 Azure 功能重新发送消息到其他服务总线

问题描述 投票:0回答:1

我有一个 Azure 函数,我想从主题上的服务总线订阅获取消息,然后将所有消息重新发送到另一个服务总线

但是当我尝试这样做时,我遇到了各种异常,并且它永远不会收到任何发送到其他服务总线的消息。

这是我在我的天蓝色函数中执行此操作的方法

public PartStatusFunction(ILogger<PartStatusFunction> log, IEnvironmentVariableLoader environmentVariableLoader)
    {
        _logger = log;
        _environmentVariableLoader = environmentVariableLoader;
    }

    [FunctionName("PartStatusFunction")]
    public async Task Run([ServiceBusTrigger("partstatus", "linktestest", Connection = "smartadmin.messagebus.subscription.name")] ServiceBusReceivedMessage message)
    {
        _logger.LogInformation($"C# ServiceBus topic trigger function processed message: {message}");

        try
        {
            ServiceBusSender serviceBusSender = _environmentVariableLoader.GetServiceBusClient().CreateSender("partstatus");
            ServiceBusMessage serviceMessage = new ServiceBusMessage(message);
            await serviceBusSender.SendMessageAsync(serviceMessage);
        }
        catch (Exception e)
        {
            _logger.LogInformation($"Failed to send message to linktest azure bus: " + message.ToString() + " Exception " + e.ToString());

            throw;
        }
    }

我创建了一个 EnvironmentVariableLoader 类,它是在我的启动类上的单例实例中创建的

 public class Startup : FunctionsStartup
{
    public override void Configure(IFunctionsHostBuilder builder)
    {
        builder.Services.AddSingleton<IEnvironmentVariableLoader, EnvironmentVariableLoader>();   
    }
}

EnvironmentVariableLoader 然后像这样创建一个 ServiceBusClient

private ServiceBusClient _serviceBusClient;

    public EnvironmentVariableLoader()
    {
        _serviceBusClient = new ServiceBusClient("Endpoint=xxxxxxxxxxxx",
            new ServiceBusClientOptions()
            {
                TransportType = ServiceBusTransportType.AmqpTcp, 
            });
    }

    public ServiceBusClient GetServiceBusClient()
    {
        return _serviceBusClient;
    }

当我尝试在控制台应用程序中执行此操作时,它有效。

 _serviceBusClientReceive = new ServiceBusClient("Endpoint=yyyyyyyyy",
            new ServiceBusClientOptions()
            {
                TransportType = ServiceBusTransportType.AmqpTcp
            });

        _serviceBusClientSend = new ServiceBusClient("Endpoint=xxxxxxxx",
            new ServiceBusClientOptions()
            {
                TransportType = ServiceBusTransportType.AmqpTcp
            });

        while (true)
        {
            try
            {
                var serviceBusReceiver = _serviceBusClientReceive.CreateReceiver("partstatus", "linktestest");
                var receivedMessage= await serviceBusReceiver.ReceiveMessageAsync();
                
                ServiceBusSender serviceBusSender = _serviceBusClientSend.CreateSender("partstatus");
                ServiceBusMessage serviceMessage = new ServiceBusMessage(receivedMessage);
                await serviceBusSender.SendMessageAsync(serviceMessage);
            }
            catch (Exception e)
            {

                
            }
        }

是否无法在 Azure Function 中创建与另一个服务总线的连接?

如有任何帮助,我们将不胜感激。

azure-functions azureservicebus
1个回答
0
投票

如果您使用 Azure Functions,并且使用 Azure 服务总线输出绑定来补充触发器,则该解决方案可能比最初的建议简单得多。本质上,您的函数代码成为传入队列和传出队列之间的桥梁。

public class Functions
{
    private readonly ILogger<Functions> logger;

    public Functions(ILogger<Functions> logger)
    {
        this.logger = logger;
    }

    [ServiceBusAccount("ServiceBusConnection")]
    [FunctionName(nameof(PartStatusFunction))]
    [return: ServiceBus("%OutputQueue%")]
    public Task<ServiceBusMessage> PartStatusFunction([ServiceBusTrigger("%InputQueue%")] ServiceBusReceivedMessage message)
    {
        logger.LogInformation("Sending message with ID {MessageId} to the destination queue", message.MessageId);

        return Task.FromResult(new ServiceBusMessage(message));
    }
}

配置需要以下值。

  1. InputQueue
  2. OutputQueue
  3. ServiceBusConnection
  4. 表示的服务总线命名空间连接字符串
© www.soinside.com 2019 - 2024. All rights reserved.