如何将错误消息移动到rabbitmq死信队列

问题描述 投票:1回答:1

我读了很多文档/ stackoverflow,但是当发生异常以将消息移动到死信队列时,我仍有问题。我正在使用spring-boot这是我的配置:

@Autowired
    private RabbitTemplate rabbitTemplate;

    @Bean
    RetryOperationsInterceptor interceptor() {
        RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(rabbitTemplate, "error_exchange ", "error_key");
        return RetryInterceptorBuilder
            .stateless()
            .recoverer(recoverer)
            .build();
    }

死信队列:

Features    
x-dead-letter-routing-key:  error_key
x-dead-letter-exchange: error_exchange
durable:    true
Policy  DLX

队列名称:错误

我的交换:name:error_exchange binding:to:error,routing_key:error_key

这是我的消费者:

@RabbitListener(queues = "${rss_reader_chat_queue}")
    public void consumeMessage(Message message) {
        try {
            List<ChatMessage> chatMessages = messageTransformer.transformMessage(message);
            List<ChatMessage> save = chatMessageRepository.save(chatMessages);
            sendMessagesToChat(save);
        }
        catch(Exception ex) {
            throw new AmqpRejectAndDontRequeueException(ex);
        }
    }

因此,当我发送无效消息并发生一些异常时,它会发生一次(这很好,因为以前的消息是一遍又一遍地发送的)但是消息没有进入我的死信队列。你能帮帮我吗?

java rabbitmq spring-amqp spring-rabbit spring-rabbitmq
1个回答
3
投票

您需要显示其余的配置 - 启动属性,队列@Beans等。您似乎也在使用重新发布的恢复器与死信队列之间存在一些混淆;它们是实现类似结果的不同方式。您通常不会同时使用两者。

这是一个简单的启动应用程序,演示使用DLX / DLQ ...

@SpringBootApplication
public class So43694619Application implements CommandLineRunner {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(So43694619Application.class, args);
        context.close();
    }

    @Autowired
    RabbitTemplate template;

    @Autowired
    AmqpAdmin admin;

    private final CountDownLatch latch = new CountDownLatch(1);

    @Override
    public void run(String... arg0) throws Exception {
        this.template.convertAndSend("so43694619main", "foo");
        this.latch.await(10, TimeUnit.SECONDS);
        this.admin.deleteExchange("so43694619dlx");
        this.admin.deleteQueue("so43694619main");
        this.admin.deleteQueue("so43694619dlx");
    }


    @Bean
    public Queue main() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", "so43694619dlx");
        args.put("x-dead-letter-routing-key", "so43694619dlxRK");
        return new Queue("so43694619main", true, false, false, args);
    }

    @Bean
    public Queue dlq() {
        return new Queue("so43694619dlq");
    }

    @Bean
    public DirectExchange dlx() {
        return new DirectExchange("so43694619dlx");
    }

    @Bean
    public Binding dlqBinding() {
        return BindingBuilder.bind(dlq()).to(dlx()).with("so43694619dlxRK");
    }

    @RabbitListener(queues = "so43694619main")
    public void listenMain(String in) {
        throw new AmqpRejectAndDontRequeueException("failed");
    }

    @RabbitListener(queues = "so43694619dlq")
    public void listenDlq(String in) {
        System.out.println("ReceivedFromDLQ: " + in);
        this.latch.countDown();
    }

}

结果:

ReceivedFromDLQ: foo
© www.soinside.com 2019 - 2024. All rights reserved.