Spring boot:Exchange 发生错误时的 RabbitMq 和数据库管理

问题描述 投票:0回答:1

我在管理 RabbitMQ 和数据库事务时遇到问题以防找不到交换。这是简单的顺序:

  1. 将消息发送至 Exchange
  2. 在数据库中将已发送消息标记为已发送

当未找到 Exchange 时,不会发送消息,但会在数据库中更新该行,这不尊重事务行为。 针对其他错误情况(数据库错误或 RabbitMQ 不可用)正确管理事务。

如何将此用例作为事务处理进行管理?

配置中启用交易:

@Bean
@ConditionalOnMissingClass("org.springframework.orm.jpa.JpaTransactionManager")
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
    return new RabbitTransactionManager(connectionFactory);
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setMessageConverter(jacksonMessageConverter());
    template.setChannelTransacted(true);
    return template;
}

我的服务:

@Override
@Transactional
public void push(Message message) {

    rabbitTemplate.convertAndSend(
                "MessageExchange",
                "binding.key",
                objectMapper.writeValueAsString(message));

    repository.markAsSent(message.getId());
}

离开方法后引发错误,而不是在

rabbitTemplate.convertAndSend
方法中:

[AMQP Connection] ERROR o.s.a.r.c.CachingConnectionFactory.log : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'MessageExchange' in vhost '/', class-id=60, method-id=40)
[ThreadPoolTaskScheduler1] ERROR o.s.t.s.TransactionSynchronizationUtils.invokeAfterCompletion : TransactionSynchronization.afterCompletion threw exception
java.lang.IllegalStateException: Channel closed during transaction
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1171)
    at com.sun.proxy.$Proxy143.txCommit(Unknown Source)
    at org.springframework.amqp.rabbit.connection.RabbitResourceHolder.commitAll(RabbitResourceHolder.java:153)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$RabbitResourceSynchronization.afterCompletion(ConnectionFactoryUtils.java:332)
database spring-boot transactions rabbitmq
1个回答
0
投票

尝试使用

Jarkata Transactional
注解代替 Spring 框架默认注解:

@Override
@jakarta.transaction.Transactional
public void push(Message message) {

    rabbitTemplate.convertAndSend(
            "MessageExchange",
            "binding.key",
            objectMapper.writeValueAsString(message));

    repository.markAsSent(message.getId());
}

这应该可以解决您的问题。

© www.soinside.com 2019 - 2024. All rights reserved.