我有一个要求,我必须编写一个 sqs 消费者,它从 AWS SQS 异步消费消息。我的假设是 JMS 是多线程的,对于 MessageListener 的 onMessage() 的每次调用,它都会为其分配一个新线程。
SQSConnectionManager.java
public class SQSConnectionManager {
private static final Logger LOGGER = LoggerFactory.getLogger(SQSConnectionManager.class);
private SQSConnectionFactory sqsConnectionFactory;
private SQSConnection sqsConnection;
private Session sqsSession;
public SQSConnectionManager() {
}
public void createSQSConnection(final String queueName) throws JMSException {
LOGGER.info("Initializing sqs connection");
sqsConnectionFactory = new SQSConnectionFactory(
new ProviderConfiguration(),
AmazonSQSClientBuilder.standard()
.build()
);
sqsConnection = sqsConnectionFactory.createConnection();
sqsSession = sqsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = sqsSession.createQueue(queueName);
MessageConsumer sqsConsumer = sqsSession.createConsumer(queue);
sqsConsumer.setMessageListener(new MyCustomListener());
sqsConnection.start();
LOGGER.info("SQS Connection started");
}
}
MyCustomListener.java
public class MyCustomListener implements MessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomListener.class);
public MyCustomListener() {}
@Override
public void onMessage(Message message) {
try {
LOGGER.info("onMessage() Thread name : {}", Thread.currentThread().getName());
LOGGER.info("onMessage() Thread id : {}", Thread.currentThread().getId());
LOGGER.info("Reading incoming sqs message");
final SQSTextMessage sqsTextMessage = (SQSTextMessage) message;
final String receivedMessage = sqsTextMessage.getText();
LOGGER.info("Received sqs message : {}", receivedMessage);
helper(receivedMessage);
} catch (JMSException e) {
LOGGER.error("Failed to read incoming sqs message : {}", e.getCause());
}
}
private void helper(final String sqsMessage) {
LOGGER.info("helper() Thread name : {}", Thread.currentThread().getName());
LOGGER.info("helper() Thread id : {}", Thread.currentThread().getId());
LOGGER.info("sqs message : {}", sqsMessage);
}
}
Application.java
public class Application {
private static final Logger LOGGER = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) throws Exception {
SQSConnectionManager sqsConnectionManager = new SQSConnectionManager();
sqsConnectionManager.createSQSConnection("test-queue");
}
}
AWS Maven 依赖
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-sqs-java-messaging-lib</artifactId>
<version>1.1.0</version>
</dependency>
这个 java 应用程序作为弹性 bean 茎应用程序部署到 aws。 当我检查云手表中的日志时,我看到
onMessage()
和 helper()
的线程 ID 相同
谁能帮我理解 JMS Listener 如何处理线程概念?它是否确保像在多线程中那样执行?