使用绑定键的Symfony messenger队列 - 重试策略

问题描述 投票:2回答:1

我正在我工作的公司里实施Messenger。我发现路由键有问题。

我想把一条信息发送到两个队列。另外两个应用程序将处理这个队列。一切都很好,但是当处理程序抛出异常时,我发现了一个问题。因为重试队列是通过绑定键来匹配的,而绑定键对这个队列来说是一样的,所以它把一个消息发送到两个重试队列中的消息加倍。

最后在3次重试后,我的dlqs上有16条消息。你能帮我解决这个问题吗?是否可以根据队列而不是路由键来创建重试策略?

我的配置是这样的。

messenger:
    failure_transport: failed
    default_bus: command.bus
    transports:
        async:
            dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
            options:
                retry_strategy:
                    max_retries: 3
                    delay: 1000
                    multiplier: 2
                    max_delay: 0
                exchange:
                    name: olimp
                    type: topic
                queues:
                    create_miniature_v1:
                        binding_keys:
                            - first
                    create_miniature_v2:
                        binding_keys:
                            - first
        failed:
            dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
            options:
                exchange:
                    name: olimp_dead
                    type: topic
                queues:
                    create_miniature_v1_dlq:
                        binding_keys:
                            - first
                    create_miniature_v2_dlq:
                        binding_keys:
                            - first

    routing:
        'Olimp\Messenger\TestEvent': async

    buses:
        command.bus:
            middleware:
                - Olimp\Shared\Application\Message\Middleware\EventDispatcher
                - doctrine_close_connection
                - doctrine_transaction

        event.bus:
            default_middleware: allow_no_handlers

        query.bus: ~

我用这样的印记发送事件

class MessengerTestCommand extends Command
{
    protected static $defaultName = 'app:messenger-test';
    private MessageBusInterface $bus;

    public function __construct(MessageBusInterface $bus)
    {
        $this->bus = $bus;

        parent::__construct();
    }

    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        $io = new SymfonyStyle($input, $output);

        $this->bus->dispatch(
            new TestEvent(), [
                new AmqpStamp('first')
            ]
        );

        $io->success('Done');

        return 0;
    }
}

Handler:

class TestEventHandler implements MessageHandlerInterface
{
    public function __invoke(TestEvent $event)
    {
        dump($event->id);

        throw new \Exception('Boom');
    }
}

我在rabbit上找到的Rabbit

现在我正在尝试这样的配置:

framework:
    messenger:
        failure_transport: failed
        default_bus: command.bus
        transports:
            async:
                dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
                options:
                    retry_strategy:
                        max_retries: 3
                        delay: 1000
                        multiplier: 2
                        max_delay: 0
                    exchange:
                        name: olimp
                        type: topic
                    queues:
                        create_miniature_v1:
                            binding_keys:
                                - first
            async1:
                dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
                options:
                    retry_strategy:
                        max_retries: 3
                        delay: 1000
                        multiplier: 2
                        max_delay: 0
                    exchange:
                        name: olimp
                        type: topic
                    queues:
                        create_miniature_v2:
                            binding_keys:
                                - first
            failed:
                dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
                options:
                    exchange:
                        name: olimp_dead
                        type: topic
                    queues:
                        create_miniature_v1_dlq:
                            binding_keys:
                                - first
                        create_miniature_v2_dlq:
                            binding_keys:
                                - first

        routing:
            'Olimp\Messenger\TestEvent': [async, async1]

用两个运行中的控制台命令

bin/console messenger:consume async
bin/console messenger:consume async1

但结果还是一样

php symfony rabbitmq amqp symfony-messenger
1个回答
1
投票

好吧,我自己找到了答案。

我创建了新的重试策略。我改变了 queue_name_pattern%routing_key%_%delay% 并创建了我自己的 SendFailedMessageForRetryListener. 为了重试信封,我加了邮票 new AmqpStamp($envelope->last(AmqpReceivedStamp::class)->getQueueName()) 用来为延迟队列创建适当的路由密钥。因此,我没有根据交换名称创建队列,而是根据队列名称来创建。

还有两件事。

队列中的绑定键是这样的:

queues:
    create_miniature_v1:
        binding_keys:
            - create_miniature_v1
            - first
    create_miniature_v2:
        binding_keys:
            - create_miniature_v2
            - first

和失败的队列

queues:
    create_miniature_v1_dlq:
        binding_keys:
            - create_miniature_v1
    create_miniature_v2_dlq:
        binding_keys:
            - create_miniature_v2
© www.soinside.com 2019 - 2024. All rights reserved.