我在使用 ActiveMQ 5 上的 JMS 作为代理的 Spring Boot 应用程序中遇到了以下情况。
@JMSListener
带注释的方法处理消息并将响应消息发送到不同的目的地。该目的地还有一个 @JMSListener
,当响应发送到代理时不会调用它,而是仅在原始监听器的处理完全完成时调用。如果此侦听器另外带有 @Async
注释,则按预期发送后会立即收到响应。
原来的项目太大了,所以我准备了下面的最小示例。 它包含一个 Spring Boot 应用程序
TestApp
,其中有一个 @JmsListener
(1),它会立即将消息从目的地 in 转发到 out,并且 afterwards 休眠 3 秒。
应用程序在测试中启动,向 in 发送一条消息,并等待 2 秒以获取 out 上的响应。
仅当
@Async
出现在 (1) 时,测试才成功。
进一步观察:
JmsTemplate
而不是使用 JmsListener
接收响应,则行为相同。问题:为什么这种情况下接收自己发送的消息会被阻止?不使用
@Async
,如何能立即收到外发消息?
更新/解决方案:正如 Gary 所说,确实存在一个事务,但似乎不是 Spring Boot 的事务,而是由包含的 activemq-lib 创建的事务。
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
@SpringBootTest(classes = TestApp.class, webEnvironment = NONE)
@Testcontainers
public class JmsTest
{
private static final Logger LOG = LoggerFactory.getLogger(JmsTest.class);
@Container
public static final GenericContainer<?> ACTIVEMQ =
new GenericContainer<>(DockerImageName.parse("rmohr/activemq"))
.withExposedPorts(8161, 61616)
.waitingFor(new LogMessageWaitStrategy().withRegEx(".*Apache ActiveMQ .* started.*"))
.withStartupTimeout(Duration.ofSeconds(60))
.withLogConsumer(new Slf4jLogConsumer(LOG));
@DynamicPropertySource
private static void ports(DynamicPropertyRegistry registry)
{
registry.add("spring.activemq.broker-url", () -> "tcp://" + ACTIVEMQ.getHost() + ":" + ACTIVEMQ.getMappedPort(61616));
}
@Autowired
private JmsTemplate jmsTemplate;
private List<String> messages = new LinkedList<>();
@Async
@JmsListener(destination = "out")
public void onOut(String message)
{
LOG.warn("Received message from out: {}", message);
messages.add(message);
}
@Test
public void foo() throws InterruptedException
{
LOG.warn("Sending request");
// Sending some message on destination 'in' to be received and answered by the listener below
jmsTemplate.convertAndSend("in", UUID.randomUUID().toString());
LOG.warn("Waiting for repsonse");
// (2) // Try to receive response from 'out'
// jmsTemplate.setReceiveTimeout(2_000);
// Message response = jmsTemplate.receive("out");
// assertThat(response, notNullValue());
Thread.sleep(2_000);
assertThat(messages, hasSize(1));
}
}
@SpringBootApplication
@EnableJms
@EnableAsync
class TestApp
{
private static final Logger LOG = LoggerFactory.getLogger(TestApp.class);
public static void main(String[] args)
{
SpringApplication.run(TestApp.class, args);
}
@Autowired
private JmsTemplate jmsTemplate;
// (1)
// @Async
@JmsListener(destination = "in")
public void onIn(String message) throws InterruptedException
{
LOG.warn("Received message from in: {}", message);
jmsTemplate.convertAndSend("out", message);
LOG.warn("Sent Response");
LOG.warn("Sleeping ...");
Thread.sleep(3_000);
LOG.warn("Finished");
}
}
这里
pom.xml
:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>foo</groupId>
<artifactId>jmstest</artifactId>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.5.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
看起来您正在使用事务,事务不会提交,直到
@JmsListener
方法退出,这样其他消费者就看不到它。
您不能在此用例中使用事务。
因此
@Async
可以工作,因为发送将在不同的事务中执行。