如何通过死信交换拒绝aio_pika中的消息?

问题描述 投票:0回答:1

我正在使用 aio_pika,并且在拒绝来自队列的消息时,rabbitMQ 订阅工作程序遇到问题。我拒绝了队列中的消息并交换到死信队列。消息到达死信队列。但原始队列中的消息不会消失。因此,工作人员不会停止,并且大量消息进入死信队列。

当消息被拒绝时,我想从原始队列中删除消息。

我已经尝试过这个。你能帮忙吗?

from aio_pika.abc import AbstractIncomingMessage

async def callback(
    message: AbstractIncomingMessage,
) -> None:
    try:
        async with message.process(ignore_processed=True):
            parsed = json.loads(message.body)
            success = False
            if False:
                log.msg("[x] Reject")
                await message.reject(requeue=False)
            else:
                log.msg("[x] Updated!")
python rabbitmq pika aio
1个回答
0
投票

变得更清晰了

from aio_pika.abc import AbstractIncomingMessage

def some_validation(message) -> bool:
    # some logic
    return True

async def callback(
    message: AbstractIncomingMessage,
) -> None:
    async with message.process(ignore_processed=True):
        parsed = json.loads(message.body)
        success = False
        if some_validation(parsed):
            log.msg("[x] Updated!")
            # some logic
            return None    
        log.msg("[x] Reject")
        await message.reject(requeue=False)
        
© www.soinside.com 2019 - 2024. All rights reserved.