从服务总线接收批量消息时消息排序

问题描述 投票:0回答:1

我已从 Azure 服务总线批量接收消息:

IReadOnlyList<ServiceBusReceivedMessage> receivedMessages = await serviceBusReceiver.ReceiveMessagesAsync(maxMessages: maxMessages, maxWaitTime: TimeSpan.FromSeconds(1), cancellationToken: cancellationToken); 

它工作正常,但我不确定它是否也考虑排序(先进先出)。我知道我们可以使用会话来实现这种行为。但我想问一下,接收批量消息时是否需要使用session?

我有一个接收器可以批量接收和处理消息。但我想确保订购,因为我收到设备信息,例如上次看到的信息等,并且我需要确保我已更新正确的信息。

此外,如果会话是唯一的方法,那么我是否应该将 SessionId 设置为 DeviceId,这将确保链接到单个设备的所有消息都按顺序处理?

azure azureservicebus azure-servicebus-queues
1个回答
0
投票

根据 Azure 服务总线消息会话的文档,实现先进先出 (FIFO) 模式涉及使用会话来确保消息按接收顺序进行处理。

  • 将消息提交到队列或主题时,请确保为每条消息分配一个会话 ID。此会话 ID 充当应用程序定义的会话唯一标识符。

  • 在会话感知队列或订阅上,当至少有一条带有会话 ID 的消息时,就会启动会话。具有相同会话 ID 的消息被分组在一起,形成一个会话。

  • 当使用会话 ID 发送多条消息时,服务总线确保每个会话中的消息按照接收顺序进行处理。这保持了 FIFO 保证,因为同一会话中的消息是按顺序处理的。

  • 要接收批量消息,请使用批处理。批处理将一定数量的消息作为单个批次传递给函数,从而允许在一次调用中处理多个消息。

  • 以下示例是使用

    Azure.Messaging.ServiceBus
    库接收和处理来自 Azure 服务总线队列的消息,使用
    GetSessionStateAsync()
    检索会话状态。

使用的参考资料:

  • 服务总线批处理和预取
  • 发送和接收会话消息

string connectionString = "<connection_string>";
        string queueName = "<queue_name>";
        int batchSize = 10; // Number of messages to receive in each batch
        int prefetchCount = 100; // Number of messages to prefetch

        await using var client = new ServiceBusClient(connectionString);
        var receiver = client.CreateReceiver(queueName, new ServiceBusReceiverOptions
        {
            PrefetchCount = prefetchCount
        });

        var sessionReceiver = await client.AcceptNextSessionAsync(queueName);

        while (true)
        {
            var messages = await sessionReceiver.ReceiveMessagesAsync(batchSize);

            if (messages == null || messages.Count == 0)
                break;

            foreach (var message in messages)
            {
                // Process the received message
                string messageBody = Encoding.UTF8.GetString(message.Body);
                Console.WriteLine($"Message received: {messageBody} ,SequenceNumber={message.SequenceNumber}, SessionId= {message.SessionId}");

                // Simulate processing of the message
                await Task.Delay(TimeSpan.FromSeconds(10));

                // Complete the message
                await sessionReceiver.CompleteMessageAsync(message);
            }
        }

        // After processing all messages, retrieve session state
        BinaryData state = await sessionReceiver.GetSessionStateAsync();
      

        await sessionReceiver.CloseAsync();

输出: enter image description here

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.