我正在使用 aio_pika,并且在拒绝来自队列的消息时,rabbitMQ 订阅工作程序遇到问题。我拒绝了队列中的消息并交换到死信队列。消息到达死信队列。但原始队列中的消息不会消失。因此,工作人员不会停止,并且大量消息进入死信队列。
当消息被拒绝时,我想从原始队列中删除消息。
我已经尝试过这个。你能帮忙吗?
from aio_pika.abc import AbstractIncomingMessage
async def callback(
message: AbstractIncomingMessage,
) -> None:
try:
async with message.process(ignore_processed=True):
parsed = json.loads(message.body)
success = False
if False:
log.msg("[x] Reject")
await message.reject(requeue=False)
else:
log.msg("[x] Updated!")
变得更清晰了
from aio_pika.abc import AbstractIncomingMessage
def some_validation(message) -> bool:
# some logic
return True
async def callback(
message: AbstractIncomingMessage,
) -> None:
async with message.process(ignore_processed=True):
parsed = json.loads(message.body)
success = False
if some_validation(parsed):
log.msg("[x] Updated!")
# some logic
return None
log.msg("[x] Reject")
await message.reject(requeue=False)