如果在聚合器的forceRelease之后服务中发生异常,如何死信RabbitMQ消息

问题描述 投票:0回答:1

我试图找出处理在聚合的组超时发生后调用的服务中可能发生的错误的最佳方法,该服务模仿相同的流,就像满足releaseExpression一样。

这是我的设置:

我有一个AmqpInboundChannelAdapter,它接收消息并将它们发送到我的聚合器。

当满足releaseExpression并且在groupTimeout到期之前,如果在我的ServiceActivator中抛出异常,则消息将被发送到我的死信队列中以获取该MessageGroup中的所有消息。 (以下示例中的10条消息,仅用于说明目的)这就是我所期望的。

如果我的releaseExpression没有得到满足但是groupTimeout已经满足并且组超时,如果在我的ServiceActivator中抛出异常,则消息不会被发送到我的死信队列并被激活。

在阅读了另一篇博文后,link1提到这是因为处理发生在MessageGroupStoreReaper的另一个线程中,而不是SimpleMessageListenerContainer所在的线程。一旦处理离开SimpleMessageListener的线程,消息将是auto ack。

我添加了上面链接中提到的配置,并查看发送到我的错误处理程序的错误消息。我的主要问题是,被认为是处理此方案以最大限度地减少消息丢失的最佳方法。

以下是我正在探索的选项:

  • 在我的自定义错误处理程序中使用BatchRabbitTemplate将失败的消息发布到与满足releaseExpression时相同的死信队列。 (这是我在下面概述的方法,但我担心如果在发布期间发生错误,邮件会丢失)
  • 调查是否存在我可以让SimpleMessageListener知道发生的错误并让它发送失败的一批消息到死信队列?我怀疑这是可能的,因为看起来消息已经被激活了。
  • 不要将SimpleMessageListenerContainer设置为AcknowledgeMode.AUTO,并在满足releaseExpression或groupTimeOut发生时通过服务处理消息时手动确认消息。 (这看起来有点混乱,因为MessageGroup中可能有1..N消息,但想看看其他人做了什么)

理想情况下,我希望有一个流程,当满足releaseExpression时,它会模仿相同的流程,这样消息就不会丢失。

有没有人有关于处理他们过去使用过的场景的最佳方法的建议?

感谢您的帮助和/或建议!

这是我使用Spring Integration DSL的当前配置

@Bean
    public SimpleMessageListenerContainer workListenerContainer() {
        SimpleMessageListenerContainer container =
                new SimpleMessageListenerContainer(rabbitConnectionFactory);
        container.setQueues(worksQueue());
        container.setConcurrentConsumers(4);
        container.setDefaultRequeueRejected(false);
        container.setTransactionManager(transactionManager);
        container.setChannelTransacted(true);
        container.setTxSize(10);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);          
        return container;
    }

  @Bean
    public AmqpInboundChannelAdapter inboundRabbitMessages() {
        AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(workListenerContainer());       
        return adapter;
    }

我已经定义了一个错误通道,并定义了我自己的taskScheduler用于MessageStoreRepear

   @Bean 
    public ThreadPoolTaskScheduler taskScheduler(){
        ThreadPoolTaskScheduler  ts = new ThreadPoolTaskScheduler();
        MessagePublishingErrorHandler mpe = new MessagePublishingErrorHandler();
        mpe.setDefaultErrorChannel(myErrorChannel());
        ts.setErrorHandler(mpe);
        return ts;
    }


    @Bean
    public PollableChannel myErrorChannel() {
        return new QueueChannel();
    }
 public IntegrationFlow aggregationFlow() {
        return IntegrationFlows.from(inboundRabbitMessages())               
                .transform(Transformers.fromJson(SomeObject.class))             
                 .aggregate(a->{
                    a.sendPartialResultOnExpiry(true);                  
                    a.groupTimeout(3000);   
                    a.expireGroupsUponCompletion(true);
                    a.expireGroupsUponTimeout(true);                    
                    a.correlationExpression("T(Thread).currentThread().id");
                    a.releaseExpression("size() == 10");                            
                    a.transactional(true);
                 }
                )               
                .handle("someService", "processMessages")
                .get();
    }

这是我的自定义错误流程

@Bean
    public IntegrationFlow errorResponse() {
        return IntegrationFlows.from("myErrorChannel")
                    .<MessagingException, Message<?>>transform(MessagingException::getFailedMessage,
                            e -> e.poller(p -> p.fixedDelay(100)))
                    .channel("myErrorChannelHandler")
                    .handle("myErrorHandler","handleFailedMessage")
                    .log()
                    .get();
    }

这是自定义错误处理程序

@Component
public class MyErrorHandler {

    @Autowired
    BatchingRabbitTemplate batchingRabbitTemplate;

    @ServiceActivator(inputChannel = "myErrorChannelHandler")
    public void handleFailedMessage(Message<?> message) {       
        ArrayList<SomeObject> payload = (ArrayList<SomeObject>)message.getPayload();        
        payload.forEach(m->batchingRabbitTemplate.convertAndSend("some.dlq","#", m));
    }

}

这是BatchingRabbitTemplate bean

    @Bean   
    public BatchingRabbitTemplate batchingRabbitTemplate() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(5);
        scheduler.initialize();
        BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(10, Integer.MAX_VALUE, 30000);
        BatchingRabbitTemplate batchingRabbitTemplate = new BatchingRabbitTemplate(batchingStrategy, scheduler);    
        batchingRabbitTemplate.setConnectionFactory(rabbitConnectionFactory);
        return batchingRabbitTemplate;
    }

更新1)以显示自定义消息组处理器:

public class CustomAggregtingMessageGroupProcessor extends AbstractAggregatingMessageGroupProcessor {
    @Override
    protected final Object aggregatePayloads(MessageGroup group, Map<String, Object> headers) {
        return group;
    }
}

示例服务:

@Slf4j
public class SomeService  {
    @ServiceActivator
    public void processMessages(MessageGroup messageGroup) throws IOException {
        Collection<Message<?>> messages  = messageGroup.getMessages();
        //Do business logic 
        //ack messages in the group
        for (Message<?> m : messages) {
            com.rabbitmq.client.Channel channel = (com.rabbitmq.client.Channel) 
                    m.getHeaders().get("amqp_channel");
            long deliveryTag = (long) m.getHeaders().get("amqp_deliveryTag");
            log.debug(" deliveryTag = {}",deliveryTag);
            log.debug("Channel = {}",channel);
            channel.basicAck(deliveryTag, false);
        }
    }
}

更新了integrationFlow

public IntegrationFlow aggregationFlowWithCustomMessageProcessor() {
        return IntegrationFlows.from(inboundRabbitMessages()).transform(Transformers.fromJson(SomeObject.class))
                .aggregate(a -> {
                    a.sendPartialResultOnExpiry(true);
                    a.groupTimeout(3000);
                    a.expireGroupsUponCompletion(true);
                    a.expireGroupsUponTimeout(true);
                    a.correlationExpression("T(Thread).currentThread().id");
                    a.releaseExpression("size() == 10");
                    a.transactional(true);
                    a.outputProcessor(new CustomAggregtingMessageGroupProcessor());
                }).handle("someService", "processMessages").get();
    }

新的ErrorHandler做nack

public class MyErrorHandler {

    @ServiceActivator(inputChannel = "myErrorChannelHandler")
    public void handleFailedMessage(MessageGroup messageGroup) throws IOException {
        if(messageGroup!=null) {
            log.debug("Nack messages size = {}", messageGroup.getMessages().size());
            Collection<Message<?>> messages  = messageGroup.getMessages();
            for (Message<?> m : messages) {
                com.rabbitmq.client.Channel channel = (com.rabbitmq.client.Channel) 
                        m.getHeaders().get("amqp_channel");
                long deliveryTag = (long) m.getHeaders().get("amqp_deliveryTag");           
                log.debug("deliveryTag = {}",deliveryTag);
                log.debug("channel = {}",channel);
                channel.basicNack(deliveryTag, false, false);
            }       
        }
    }
}

更新2添加了自定义发布策略并更改为聚合器

public class CustomMeasureGroupReleaseStratgedy implements ReleaseStrategy {

    private static final int MAX_MESSAGE_COUNT = 10;

    public boolean canRelease(MessageGroup messageGroup) {
        return messageGroup.getMessages().size() >= MAX_MESSAGE_COUNT;
    }
}
   public IntegrationFlow aggregationFlowWithCustomMessageProcessorAndReleaseStratgedy() {
        return IntegrationFlows.from(inboundRabbitMessages()).transform(Transformers.fromJson(SomeObject.class))
                .aggregate(a -> {
                    a.sendPartialResultOnExpiry(true);
                    a.groupTimeout(3000);
                    a.expireGroupsUponCompletion(true);
                    a.expireGroupsUponTimeout(true);
                    a.correlationExpression("T(Thread).currentThread().id");                   
                    a.transactional(true);
                    a.releaseStrategy(new CustomMeasureGroupReleaseStratgedy());            
                    a.outputProcessor(new CustomAggregtingMessageGroupProcessor());
                }).handle("someService", "processMessages").get();
    }
spring-boot spring-integration spring-amqp spring-integration-dsl
1个回答
0
投票

您的理解存在一些缺陷。如果您使用AUTO,则只有最后一条消息在发生异常时才会被删除。在发布之前成功存入组中的邮件将立即被确认。

实现您想要的唯一方法是使用MANUAL acks。

没有办法“告诉侦听器容器向DLQ发送消息”。容器从不向DLQ发送消息,它拒绝消息并且代理将其发送到DLX / DLQ。

© www.soinside.com 2019 - 2024. All rights reserved.