如何配置Spring AMQP在监听器抛出AmqpRejectAndDontRequeueException时不重新排队?

问题描述 投票:0回答:1

我目前有一个 Spring Boot 应用程序配置为使用 spring-boot-starter-amqp 2.1.5.RELEASE。我已在 yaml 中将其配置为重试:

  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # retrys enabled
          max-attempts: 3 # total number number attempts (includes the original one)
          multiplier: 1.5 # multiple of initial interval for subsequent retries
          initial-interval: 1000 # first interval between attempts

在我的监听器中,在某种情况下,我抛出一个

AmqpRejectAndDontRequeueException
但这并不能阻止重新排队。

如何配置一个与自动 spring 配置内联的配置 bean,以在抛出此异常时停止进一步重新排队消息?

队列 A 应尝试对队列 A 侦听器进行处理 3 次,并且日志支持这一点。 队列 B 应该只尝试一次,并在抛出

AmqpRejectAndDontRequeueException
时停止。

SpringBoot应用程序类:

import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.List;

import javax.annotation.PostConstruct;

import org.apache.commons.lang3.ObjectUtils;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;

import lombok.extern.slf4j.Slf4j;

@SpringBootApplication
@Slf4j
public class AmqpApplication
{

    protected static final String X_ATTEMPTS_HEADER = "x-attempts";
    protected static final String X_LAST_ATTEMPT_DATE_HEADER = "x-last-attempt-date";

    public static void main(String[] args) throws InterruptedException
    {
        ConfigurableApplicationContext context = SpringApplication.run(AmqpApplication.class, args);
        context.close();
        System.exit(0);
    }

    private String host = "localhost";

    private Integer port = 5672;

    private String vhost = "/";

    private String username = "guest";

    private String password = "guest";

    private String exchangeName = "common-exchange";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * Configures the connection factory using the configured values
     * 
     * @return Connection factory to use to connect to rabbitmq and send events
     **/
    private ConnectionFactory connectionFactory()
    {
        CachingConnectionFactory factory = new CachingConnectionFactory(host, port);

        factory.setRequestedHeartBeat(30);
        factory.setConnectionTimeout(30000);
        factory.setChannelCacheSize(10);

        factory.setVirtualHost(vhost);
        factory.setUsername(username);
        factory.setPassword(password);
        return factory;
    }

    @Bean
    public Queue queueA()
    {
        return QueueBuilder.durable("a").withArgument("x-dead-letter-exchange", "a")
                .withArgument("x-dead-letter-routing-key", "a-dead-letter").build();
    }

    @Bean
    public Queue queueB()
    {
        return QueueBuilder.durable("b").withArgument("x-dead-letter-exchange", "b")
                .withArgument("x-dead-letter-routing-key", "b-dead-letter").build();
    }

    @Bean
    Queue DeadLetterQueueA()
    {
        return QueueBuilder.durable("a-dead-letter").build();
    }

    @Bean
    Queue DeadLetterQueueB()
    {
        return QueueBuilder.durable("b-dead-letter").build();
    }

    /**
     * Required for executing adminstration functions against an AMQP Broker
     */
    @Bean
    public AmqpAdmin amqpAdmin(RabbitListenerEndpointRegistry registry)
    {
        //@// @formatter:off

        RabbitAdmin admin = new RabbitAdmin(connectionFactory());
        admin.declareQueue(queueA());
        admin.declareQueue(queueB());
        registry.start();
        return admin;
    }

    /**
     * The following is a complete declaration of an exchange, a queue and a
     * exchange-queue binding
     */
    @Bean
    public DirectExchange directExchange()
    {
        return new DirectExchange(exchangeName, true, false);
    }

    @Bean
    public List<Binding> exchangeBinding()
    {
        // Important part is the routing key -- this is just an example
        return Arrays.asList(
                BindingBuilder.bind(queueA()).to(directExchange()).with("a"),
                BindingBuilder.bind(DeadLetterQueueA()).to(directExchange())
                        .with("a"),
                        BindingBuilder.bind(queueB()).to(directExchange()).with("b"),
                        BindingBuilder.bind(DeadLetterQueueB()).to(directExchange())
                                .with("b"));
    }



    @Bean
    public RabbitTemplate rabbitTemplate()
    {
        // Add the object mapper to the converter
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.registerModule(new JsonOrgModule());

        // Add the object mapper to the converter
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMessageConverter(new Jackson2JsonMessageConverter(objectMapper));

        template.setExchange(exchangeName);

        return template;
    }

    @PostConstruct
    public void sendMessages() throws InterruptedException
    {
        rabbitTemplate.convertAndSend(exchangeName, "a", new BeanObject().setName("a"));
        rabbitTemplate.convertAndSend(exchangeName, "b", new BeanObject().setName("b"));
    }
    
   @RabbitListener(queues = "a")
    public void aListener(@Payload BeanObject payload, Message message,
            @Header(required = false, name = X_ATTEMPTS_HEADER, defaultValue = "0") Integer attempts)
    {
        beforeProcessing(payload,message,attempts);
        throw new RuntimeException();
    }
    
    @RabbitListener(queues = "b")
    public void bListener(@Payload BeanObject payload, Message message,
            @Header(required = false, name = X_ATTEMPTS_HEADER, defaultValue = "0") Integer attempts)
    {
        beforeProcessing(payload,message,attempts);
        throw new AmqpRejectAndDontRequeueException("");
    }
    
    private void beforeProcessing(BeanObject payload, Message message,
            @Header(required = false, name = X_ATTEMPTS_HEADER, defaultValue = "0") Integer attemptNo)
    {

        //// @formatter:off
        attemptNo++;// Increment
        
        message.getMessageProperties().getHeaders().put(X_ATTEMPTS_HEADER, attemptNo);//update attempts header
     // @formatter:on
        log.info(
                "bean: {}, attemptNo: {}",
                payload, attemptNo);
    }

}

消息转换器类:

import java.io.IOException;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;

@Component
public class MessageConverter implements org.springframework.amqp.support.converter.MessageConverter
{
    private final ObjectMapper objectMapper = new ObjectMapper()
            .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException
    {

        return null;
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException
    {
        if (message.getMessageProperties() == null || message.getMessageProperties().getHeaders() == null
                || !message.getMessageProperties().getHeaders().containsKey("__TypeId__"))
        {
            throw new MessageConversionException(
                    "No header exists in the message for [__TypeId__]. This is required to hint the conversion type.");
        }
        String typeId = message.getMessageProperties().getHeaders().get("__TypeId__").toString();
        try
        {
            return objectMapper.readValue(message.getBody(), Class.forName(typeId));
        }
        catch (ClassNotFoundException | IOException e)
        {
            throw new MessageConversionException(
                    String.format("Unable to convert message payload to type [%s]", typeId));
        }
    }

}

龙目岛豆类:

package com.amqp;

import lombok.Data;
import lombok.experimental.Accessors;

@Data
@Accessors(chain = true)
public class BeanObject
{
    private String name;
}

POM:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.amqp</groupId>
    <artifactId>amqp</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>amqp</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jdk8</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310 -->
<dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
        <!-- https://mvnrepository.com/artifact/com.fasterxml/jackson-module-json-org -->
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-json-org -->
<dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-json-org</artifactId>
</dependency>

        <dependency>
            <groupId>javax.interceptor</groupId>
            <artifactId>javax.interceptor-api</artifactId>
            <version>1.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

日志:

bean: BeanObject(name=a), attemptNo: 1
bean: BeanObject(name=b), attemptNo: 1

bean: BeanObject(name=a), attemptNo: 2
bean: BeanObject(name=b), attemptNo: 2
bean: BeanObject(name=b), attemptNo: 3
bean: BeanObject(name=a), attemptNo: 3
spring-amqp spring-rabbit spring-retry
1个回答
5
投票

我找到了解决方案。 Amqp 将抛出的异常包装在 ListenerExecutionFailedException 中。我已经重写了 SimpleRabbitListenerContainerFactory 并指定了我自己的重试策略,该策略扩展了 SimpleRetryPolicy。然后,我确保将可抛出的原因传递给 retryForException 方法。我还确保在建议链中指定可重试的类映射:

现在是日志,您可以看到“a”尝试了 3 次,而“b”仅尝试了一次:

bean: BeanObject(name=b), attemptNo: 1
bean: BeanObject(name=a), attemptNo: 1
bean: BeanObject(name=a), attemptNo: 2
bean: BeanObject(name=a), attemptNo: 3

这是新的主 Spring 引导类:

package com.amqp;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.annotation.PostConstruct;

import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.classify.BinaryExceptionClassifier;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.retry.RetryContext;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.util.ErrorHandler;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;

import lombok.extern.slf4j.Slf4j;

@SpringBootApplication
@Slf4j
public class AmqpApplication
{

    protected static final String X_ATTEMPTS_HEADER = "x-attempts";
    protected static final String X_LAST_ATTEMPT_DATE_HEADER = "x-last-attempt-date";

    public static void main(String[] args) throws InterruptedException
    {
        ConfigurableApplicationContext context = SpringApplication.run(AmqpApplication.class, args);
        context.close();
        System.exit(0);
    }

    private String host = "localhost";

    private Integer port = 5672;

    private String vhost = "/";

    private String username = "guest";

    private String password = "guest";

    private String exchangeName = "common-exchange";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory()
    {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setAdviceChain(retryOperationsInterceptor().build());
        factory.setErrorHandler(new ConditionalRejectingErrorHandler());
        factory.setAutoStartup(true);
        factory.setMessageConverter(new MessageConverter());
        return factory;
    }

    /**
     * Configures the connection factory using the configured values
     * 
     * @return Connection factory to use to connect to rabbitmq and send events
     **/
    private ConnectionFactory connectionFactory()
    {
        CachingConnectionFactory factory = new CachingConnectionFactory(host, port);

        factory.setRequestedHeartBeat(30);
        factory.setConnectionTimeout(30000);
        factory.setChannelCacheSize(10);

        factory.setVirtualHost(vhost);
        factory.setUsername(username);
        factory.setPassword(password);
        return factory;
    }

    @Bean
    public RetryInterceptorBuilder<?> retryOperationsInterceptor()
    {
        RetryInterceptorBuilder<?> builder = RetryInterceptorBuilder.stateless();
        builder.retryPolicy(new MyRetryPolicy(3, retryableClassifier()));
        builder.backOffPolicy(backoffPolicy());

        MessageRecoverer recoverer = new RejectAndDontRequeueRecoverer();
        builder.recoverer(recoverer);
        return builder;
    }

    @Bean
    public BackOffPolicy backoffPolicy()
    {
        ExponentialBackOffPolicy backoffPolicy = new ExponentialBackOffPolicy();
        backoffPolicy.setInitialInterval(1000);
        backoffPolicy.setMaxInterval(10000);
        backoffPolicy.setMultiplier(1.5);
        return backoffPolicy;
    }

    @Bean
    public Map<Class<? extends Throwable>, Boolean> retryableClassifier()
    {
        Map<Class<? extends Throwable>, Boolean> retryableClassifier = new HashMap<>();
        retryableClassifier.put(AmqpRejectAndDontRequeueException.class, false);
        retryableClassifier.put(Exception.class, true);
        return retryableClassifier;
    }

    @Bean
    public Queue queueA()
    {
        return QueueBuilder.durable("a").withArgument("x-dead-letter-exchange", "a")
                .withArgument("x-dead-letter-routing-key", "a-dead-letter").build();
    }

    @Bean
    public Queue queueB()
    {
        return QueueBuilder.durable("b").withArgument("x-dead-letter-exchange", "b")
                .withArgument("x-dead-letter-routing-key", "b-dead-letter").build();
    }

    @Bean
    Queue DeadLetterQueueA()
    {
        return QueueBuilder.durable("a-dead-letter").build();
    }

    @Bean
    Queue DeadLetterQueueB()
    {
        return QueueBuilder.durable("b-dead-letter").build();
    }

    /**
     * Required for executing adminstration functions against an AMQP Broker
     */
    @Bean
    public AmqpAdmin amqpAdmin(RabbitListenerEndpointRegistry registry,
            SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory)
    {
        //@// @formatter:off
        RabbitAdmin admin = new RabbitAdmin(connectionFactory());
        admin.declareQueue(queueA());
        admin.declareQueue(queueB());
        registry.start();
        return admin;
    }

    /**
     * The following is a complete declaration of an exchange, a queue and a
     * exchange-queue binding
     */
    @Bean
    public DirectExchange directExchange()
    {
        return new DirectExchange(exchangeName, true, false);
    }

    @Bean
    public List<Binding> exchangeBinding()
    {
        // Important part is the routing key -- this is just an example
        return Arrays.asList(
                BindingBuilder.bind(queueA()).to(directExchange()).with("a"),
                BindingBuilder.bind(DeadLetterQueueA()).to(directExchange())
                        .with("a"),
                        BindingBuilder.bind(queueB()).to(directExchange()).with("b"),
                        BindingBuilder.bind(DeadLetterQueueB()).to(directExchange())
                                .with("b"));
    }



    @Bean
    public RabbitTemplate rabbitTemplate()
    {
        // Add the object mapper to the converter
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.registerModule(new JsonOrgModule());

        // Add the object mapper to the converter
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMessageConverter(new Jackson2JsonMessageConverter(objectMapper));

        template.setExchange(exchangeName);

        return template;
    }

    @PostConstruct
    public void sendMessages() throws InterruptedException
    {
        rabbitTemplate.convertAndSend(exchangeName, "a", new BeanObject().setName("a"));
        rabbitTemplate.convertAndSend(exchangeName, "b", new BeanObject().setName("b"));
    }

    @RabbitListener(queues = "a")
    public void aListener(@Payload BeanObject payload, Message message,
            @Header(required = false, name = X_ATTEMPTS_HEADER, defaultValue = "0") Integer attempts)
    {
        beforeProcessing(payload,message,attempts);
        throw new RuntimeException();
    }

    @RabbitListener(queues = "b")
    public void bListener(@Payload BeanObject payload, Message message,
            @Header(required = false, name = X_ATTEMPTS_HEADER, defaultValue = "0") Integer attempts)
    {
        beforeProcessing(payload,message,attempts);
        throw new AmqpRejectAndDontRequeueException("");
    }

    private void beforeProcessing(BeanObject payload, Message message,
            @Header(required = false, name = X_ATTEMPTS_HEADER, defaultValue = "0") Integer attemptNo)
    {

        //// @formatter:off
        attemptNo++;// Increment

        message.getMessageProperties().getHeaders().put(X_ATTEMPTS_HEADER, attemptNo);//update attempts header
     // @formatter:on
        log.info(
                "bean: {}, attemptNo: {}",
                payload, attemptNo);
    }

    private static class MyRetryPolicy extends SimpleRetryPolicy
    {

        private BinaryExceptionClassifier retryableClassifier;
        private int maxAttempts;

        @Override
        public boolean canRetry(RetryContext context)
        {
            Throwable t = context.getLastThrowable();
            return (t == null || retryForException(t.getCause())) && context.getRetryCount() < maxAttempts;
        }

        public MyRetryPolicy(int maxAttempts, Map<Class<? extends Throwable>, Boolean> retryableExceptions)
        {
            this.maxAttempts = maxAttempts;
            this.retryableClassifier = new BinaryExceptionClassifier(retryableExceptions, false);
        }

        private boolean retryForException(Throwable ex)
        {
            return this.retryableClassifier.classify(ex);
        }
    }

    public static class MyErrorHandler implements ErrorHandler
    {

        @Override
        public void handleError(Throwable t)
        {
            if (!this.causeChainContainsARADRE(t))
            {
                throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", t);
            }
        }

        private boolean causeChainContainsARADRE(Throwable t)
        {
            Throwable cause = t.getCause();
            while (cause != null)
            {
                if (cause instanceof AmqpRejectAndDontRequeueException)
                {
                    return true;
                }
                cause = cause.getCause();
            }
            return false;
        }

    }

}

© www.soinside.com 2019 - 2024. All rights reserved.