Azure 服务总线队列中有大约 2M 条消息。我正在尝试使用下面的代码批量消耗 10k 消息。在消耗了大约 8L 条消息后,令人惊讶的是出现了异常,例如无法检索最后收到的消息编号。
以下是示例代码
def consume_messages_from_queue(service_bus_queue: ServiceBusQueue) -> list[dict]:
"""
return at max 10000 consumed msgs from service bus queue with max wait time as 15 seconds
"""
with ServiceBusClient.from_connection_string(conn_str=service_bus_queue.conn_str, transport_type=TransportType.AmqpOverWebsocket) as servicebus_client:
with servicebus_client.get_queue_receiver(queue_name=service_bus_queue.queue_name) as receiver:
received_msgs = receiver.receive_messages(
max_message_count=10000, max_wait_time=15)
received_msgs_json = []
for msg in received_msgs:
try:
received_msgs_json.append(json.loads(str(msg)))
receiver.complete_message(msg)
except Exception as ex:
print(
f'Exception in converting service bus message to json: {ex}')
receiver.dead_letter_message(message=msg, reason=str(ex))
return received_msgs_json
下面是异常堆栈跟踪
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/uamqp/receiver.py:229, in MessageReceiver._message_received(self, message)
221 """Callback run on receipt of every message. If there is
222 a user-defined callback, this will be called.
223 Additionally if the client is retrieving messages for a batch
(...)
226 :param message: c_uamqp.Message
227 """
228 # pylint: disable=protected-access
--> 229 message_number = self._receiver.last_received_message_number()
230 if self._settle_mode == constants.ReceiverSettleMode.ReceiveAndDelete:
231 settler = None
File ./src/message_receiver.pyx:67, in uamqp.c_uamqp.cMessageReceiver.last_received_message_number()
File ./src/message_receiver.pyx:70, in uamqp.c_uamqp.cMessageReceiver.last_received_message_number()
File ./src/base.pyx:31, in uamqp.c_uamqp.StructBase._value_error()
ValueError: Operation failed.
Error: Unable to retrieve last received message number.
我尝试了类似的代码来从 Azure 服务总线队列接收消息,并且它有效。
from azure.servicebus import ServiceBusClient, TransportType
from retry import retry
service_bus_conn_str = " "
queue_name = " "
def consume_messages_from_queue(service_bus_conn_str, queue_name):
with ServiceBusClient.from_connection_string(conn_str=service_bus_conn_str, transport_type=TransportType.AmqpOverWebsocket) as servicebus_client:
with servicebus_client.get_queue_receiver(queue_name=queue_name) as receiver:
# Wrap the receive_messages call with a retry decorator
@retry(tries=3, delay=5)
def receive_messages_with_retry(receiver):
return receiver.receive_messages(max_message_count=10000, max_wait_time=15)
try:
# Call the receive_messages_with_retry function
received_msgs = receive_messages_with_retry(receiver)
for msg in received_msgs:
message_content_type = msg.content_type
message_body = str(msg)
if not message_content_type:
# Treat messages without a content type as plain text
print(f"Received plain text message: {message_body}")
elif message_content_type == "text/plain":
print(f"Received plain text message: {message_body}")
elif message_content_type == "application/xml":
print(f"Received XML message: {message_body}")
elif message_content_type == "application/json":
print(f"Received JSON message: {message_body}")
else:
print(f"Received message with unknown content type: {message_content_type}")
receiver.complete_message(msg)
except Exception as ex:
print(f'Exception while receiving messages: {ex}')
if __name__ == "__main__":
consume_messages_from_queue(service_bus_conn_str, queue_name)
输出: