Spring Boot JmsListener 被不同目的地的其他 JmsListener 阻止

问题描述 投票:0回答:1

我在使用 ActiveMQ 5 上的 JMS 作为代理的 Spring Boot 应用程序中遇到了以下情况。

@JMSListener
带注释的方法处理消息并将响应消息发送到不同的目的地。该目的地还有一个
@JMSListener
,当响应发送到代理时不会调用它,而是仅在原始监听器的处理完全完成时调用。如果此侦听器另外带有
@Async
注释,则按预期发送后会立即收到响应。

原来的项目太大了,所以我准备了下面的最小示例。 它包含一个 Spring Boot 应用程序

TestApp
,其中有一个
@JmsListener
(1),它会立即将消息从目的地 in 转发到 out,并且 afterwards 休眠 3 秒。

应用程序在测试中启动,向 in 发送一条消息,并等待 2 秒以获取 out 上的响应。

仅当

@Async
出现在 (1) 时,测试才成功。

进一步观察:

  • 如果测试使用变体 (2) 并通过
    JmsTemplate
    而不是使用
    JmsListener
    接收响应,则行为相同。
  • 在任何情况下,人们都可以看到消息在发送后立即出现在代理中。

问题:为什么这种情况下接收自己发送的消息会被阻止?不使用

@Async
,如何能立即收到外发消息?

更新/解决方案:正如 Gary 所说,确实存在一个事务,但似乎不是 Spring Boot 的事务,而是由包含的 activemq-lib 创建的事务。

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE;

import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@SpringBootTest(classes = TestApp.class, webEnvironment = NONE)
@Testcontainers
public class JmsTest
{
    private static final Logger LOG = LoggerFactory.getLogger(JmsTest.class);

    @Container
    public static final GenericContainer<?> ACTIVEMQ =
        new GenericContainer<>(DockerImageName.parse("rmohr/activemq"))
            .withExposedPorts(8161, 61616)
            .waitingFor(new LogMessageWaitStrategy().withRegEx(".*Apache ActiveMQ .* started.*"))
            .withStartupTimeout(Duration.ofSeconds(60))
            .withLogConsumer(new Slf4jLogConsumer(LOG));

    @DynamicPropertySource
    private static void ports(DynamicPropertyRegistry registry)
    {
        registry.add("spring.activemq.broker-url", () -> "tcp://" + ACTIVEMQ.getHost() + ":" + ACTIVEMQ.getMappedPort(61616));
    }

    @Autowired
    private JmsTemplate jmsTemplate;

    private List<String> messages = new LinkedList<>();

    @Async
    @JmsListener(destination = "out")
    public void onOut(String message)
    {
        LOG.warn("Received message from out: {}", message);
        messages.add(message);
    }

    @Test
    public void foo() throws InterruptedException
    {
        LOG.warn("Sending request");
        // Sending some message on destination 'in' to be received and answered by the listener below
        jmsTemplate.convertAndSend("in", UUID.randomUUID().toString());

        LOG.warn("Waiting for repsonse");

        // (2)    // Try to receive response from 'out'
        //        jmsTemplate.setReceiveTimeout(2_000);
        //        Message response = jmsTemplate.receive("out");
        //        assertThat(response, notNullValue());
        
        Thread.sleep(2_000);
        assertThat(messages, hasSize(1));

    }
}

@SpringBootApplication
@EnableJms
@EnableAsync
class TestApp
{
    private static final Logger LOG = LoggerFactory.getLogger(TestApp.class);

    public static void main(String[] args)
    {
        SpringApplication.run(TestApp.class, args);
    }

    @Autowired
    private JmsTemplate jmsTemplate;

    // (1)
    // @Async
    @JmsListener(destination = "in")
    public void onIn(String message) throws InterruptedException
    {
        LOG.warn("Received message from in: {}", message);

        jmsTemplate.convertAndSend("out", message);
        LOG.warn("Sent Response");

        LOG.warn("Sleeping ...");
        Thread.sleep(3_000);

        LOG.warn("Finished");
    }
}

这里

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>foo</groupId>
    <artifactId>jmstest</artifactId>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.5.3</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>


        <dependency>
            <groupId>org.testcontainers</groupId>
            <artifactId>testcontainers</artifactId>
            <version>1.15.3</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.testcontainers</groupId>
            <artifactId>junit-jupiter</artifactId>
            <version>1.15.3</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.hamcrest</groupId>
            <artifactId>hamcrest</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>
</project>
java spring-boot asynchronous spring-jms
1个回答
1
投票

看起来您正在使用事务,事务不会提交,直到

@JmsListener
方法退出,这样其他消费者就看不到它。

您不能在此用例中使用事务。

因此

@Async
可以工作,因为发送将在不同的事务中执行。

© www.soinside.com 2019 - 2024. All rights reserved.