在 Azure Function 中创建临时队列并使用队列触发器进行订阅

问题描述 投票:0回答:1

我遇到了一个棘手的情况,我需要在消息队列上运行一个进程,以便它们到达队列,但该队列将包含数百个需要动态创建的队列

每条消息都与一位客户相关,因此每个客户有 2 个队列

我需要一种为每个队列都有一个队列触发器的方法,但可能会有数千个队列

队列触发器似乎需要对队列的名称进行硬编码

显然我不能在代码中为每个队列名称都有一个函数

有人知道我该怎么做吗?

假设我们有 3 位客户,每人有 5 条消息

这将是 3 个队列,包含 5 条消息

我希望能够同时处理所有 3 个客户,而不是将所有内容都放在一个队列中,因为客户不相关,因此正在排队的消息在客户级别上彼此无关,所以我不希望客户 3客户 1 时必须等待消息 还有2个正在处理中

有人知道该怎么做吗?

保罗

azure-functions azureservicebus
1个回答
0
投票

我同意@Sean@Jesse Squire当创建触发器时,它被固定到单个队列并且不能更改。制作直接利用

Service Bus client
库的托管服务将是您的最佳选择。如果您想使用多个队列,可以使用
ServiceBusClient
生成
ServiceBusReceiver
。发送一定数量的消息,然后关闭接收器并继续下一条。您可以选择同时完成该任务,也可以为其不同部分分配不同的服务。

下面是 C# 代码,说明如何使用

ServiceBusClient
生成
ServiceBusReceiver
来同时处理来自多个队列的消息:-

using Azure.Messaging.ServiceBus;
using System;
using System.Threading.Tasks;
using System.Threading;

class Program
{
    const string ServiceBusConnectionString = "Endpoint=sb://siliconsb45.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxMubQxxxx";
    const string QueueName1 = "customer";
    const string QueueName2 = "customer1";

    static async Task Main(string[] args)
    {
        await ProcessQueuesConcurrently();
    }

    static async Task ProcessQueuesConcurrently()
    {
        ServiceBusClient client = new ServiceBusClient(ServiceBusConnectionString);

        var receiver1 = client.CreateReceiver(QueueName1);
        var receiver2 = client.CreateReceiver(QueueName2);

        // Start receiving messages from Queue 1
        var processingTask1 = ProcessMessagesAsync(receiver1);

        // Start receiving messages from Queue 2
        var processingTask2 = ProcessMessagesAsync(receiver2);

        // You can add more queues and tasks as needed...

        // Simulate processing for a certain duration (e.g., 30 seconds)
        await Task.Delay(TimeSpan.FromSeconds(30));

        // Set a flag to stop processing
        bool stopProcessing = true;

        // Wait for the processing tasks to complete
        await Task.WhenAll(processingTask1, processingTask2);

        // Close the receivers and client
        await receiver1.CloseAsync();
        await receiver2.CloseAsync();
        await client.DisposeAsync();
    }

    static async Task ProcessMessagesAsync(ServiceBusReceiver receiver)
    {
        try
        {
            while (true)
            {
                ServiceBusReceivedMessage message = await receiver.ReceiveMessageAsync();

                if (message != null)
                {
                    Console.WriteLine($"Received message: {message.Body}");
                    // Process the message here

                    // Complete the message to remove it from the queue
                    await receiver.CompleteMessageAsync(message);
                }
                else
                {
                    // No more messages, exit the loop
                    break;
                }
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error processing messages: {ex.Message}");
        }
    }
}

输出:-

从两个队列收到消息:-

enter image description here

  • 您还可以创建多个 Function 实例。要同时处理来自各个队列的消息,每个实例可能有不同的 queueName 配置参数。

  • 每当创建新客户时,使用 Azure 服务总线 SDK 以编程方式创建新队列。 参考 - 参考左侧其他语言的代码。

© www.soinside.com 2019 - 2024. All rights reserved.