使用 Azure 服务总线队列中的 python-sdk 获取/完成消息时出现异常

问题描述 投票:0回答:1

Azure 服务总线队列中有大约 2M 条消息。我正在尝试使用下面的代码批量消耗 10k 消息。在消耗了大约 8L 条消息后,令人惊讶的是出现了异常,例如无法检索最后收到的消息编号。

以下是示例代码

def consume_messages_from_queue(service_bus_queue: ServiceBusQueue) -> list[dict]:
    """
    return at max 10000 consumed msgs from service bus queue with max wait time as 15 seconds
    """
    with ServiceBusClient.from_connection_string(conn_str=service_bus_queue.conn_str, transport_type=TransportType.AmqpOverWebsocket) as servicebus_client:
        with servicebus_client.get_queue_receiver(queue_name=service_bus_queue.queue_name) as receiver:
            received_msgs = receiver.receive_messages(
                max_message_count=10000, max_wait_time=15)
            received_msgs_json = []
            for msg in received_msgs:
                try:
                    received_msgs_json.append(json.loads(str(msg)))
                    receiver.complete_message(msg)
                except Exception as ex:
                    print(
                        f'Exception in converting service bus message to json: {ex}')
                    receiver.dead_letter_message(message=msg, reason=str(ex))
            return received_msgs_json

下面是异常堆栈跟踪

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/uamqp/receiver.py:229, in MessageReceiver._message_received(self, message)
    221 """Callback run on receipt of every message. If there is
    222 a user-defined callback, this will be called.
    223 Additionally if the client is retrieving messages for a batch
   (...)
    226 :param message: c_uamqp.Message
    227 """
    228 # pylint: disable=protected-access
--> 229 message_number = self._receiver.last_received_message_number()
    230 if self._settle_mode == constants.ReceiverSettleMode.ReceiveAndDelete:
    231     settler = None

File ./src/message_receiver.pyx:67, in uamqp.c_uamqp.cMessageReceiver.last_received_message_number()

File ./src/message_receiver.pyx:70, in uamqp.c_uamqp.cMessageReceiver.last_received_message_number()

File ./src/base.pyx:31, in uamqp.c_uamqp.StructBase._value_error()

ValueError: Operation failed.
Error: Unable to retrieve last received message number.
azureservicebus azure-servicebus-queues azure-python-sdk
1个回答
0
投票

我尝试了类似的代码来从 Azure 服务总线队列接收消息,并且它有效。


from azure.servicebus import ServiceBusClient, TransportType
from retry import retry

service_bus_conn_str = " "
queue_name = " "
def consume_messages_from_queue(service_bus_conn_str, queue_name):
  
    with ServiceBusClient.from_connection_string(conn_str=service_bus_conn_str, transport_type=TransportType.AmqpOverWebsocket) as servicebus_client:
        with servicebus_client.get_queue_receiver(queue_name=queue_name) as receiver:
            # Wrap the receive_messages call with a retry decorator
            @retry(tries=3, delay=5)
            def receive_messages_with_retry(receiver):
                return receiver.receive_messages(max_message_count=10000, max_wait_time=15)

            try:
                # Call the receive_messages_with_retry function
                received_msgs = receive_messages_with_retry(receiver)
                for msg in received_msgs:
                    message_content_type = msg.content_type
                    message_body = str(msg)

                    if not message_content_type:
                        # Treat messages without a content type as plain text
                        print(f"Received plain text message: {message_body}")
                    elif message_content_type == "text/plain":
                        print(f"Received plain text message: {message_body}")
                    elif message_content_type == "application/xml":
                        print(f"Received XML message: {message_body}")
                    elif message_content_type == "application/json":
                        print(f"Received JSON message: {message_body}")
                    else:
                        print(f"Received message with unknown content type: {message_content_type}")

                    receiver.complete_message(msg)

            except Exception as ex:
                print(f'Exception while receiving messages: {ex}')

if __name__ == "__main__":
    consume_messages_from_queue(service_bus_conn_str, queue_name)

输出:

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.