用于处理死信队列的配置

问题描述 投票:0回答:1

我有一个使用Spring Cloud Streams-RabbitMQ在微服务中交换消息的项目。对我的项目至关重要的一件事是,我一定不会丢失任何消息。

为了最大程度地减少故障,我计划了以下工作:

  • 对队列中的邮件使用默认的重试方法
  • 配置死信队列以在一段时间后将消息再次放入队列中
  • 为了避免无限循环,仅允许几次(比如说5次),一条消息可以从死信队列重新发布到常规消息传递队列。

我相信我可以使用以下配置来完成前两项:

#dlx/dlq setup - retry dead letter 5 minutes later (300000ms later)
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.republish-to-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=300000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=

#input
spring.cloud.stream.bindings.myInput.destination=my-queue
spring.cloud.stream.bindings.myInput.group=my-group

但是,我找不到searching on this reference guide如何做我想做的(主要是,如何配置死信队列中的最大重新发布数)。我不确定自己是否走在正确的道路上-也许我应该手动创建第二个队列并编写所需的代码,并且只对完全失败的消息留空(我必须定期检查并手动处理,因为我的系统不应丢失任何消息)...

我是这些框架的新手,希望您能帮助配置我的...

java configuration rabbitmq spring-cloud-stream dead-letter
1个回答
0
投票

[This documentation for the rabbit binder显示了在重试次数失败后如何将死信发布到某些停车队列。

@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
            this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}

第二个示例显示了如何使用延迟交换插件来延迟重试之间的时间。

@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    private static final String DELAY_EXCHANGE = "dlqReRouter";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            headers.put(X_RETRIES_HEADER, retriesHeader + 1);
            headers.put("x-delay", 5000 * retriesHeader);
            this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public DirectExchange delayExchange() {
        DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Binding bindOriginalToDelay() {
        return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}
© www.soinside.com 2019 - 2024. All rights reserved.