AWS SQS JMS SDK MessageListener onMessage() 没有为每次调用分配不同的线程

问题描述 投票:0回答:0

我有一个要求,我必须编写一个 sqs 消费者,它从 AWS SQS 异步消费消息。我的假设是 JMS 是多线程的,对于 MessageListener 的 onMessage() 的每次调用,它都会为其分配一个新线程。

SQSConnectionManager.java

public class SQSConnectionManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(SQSConnectionManager.class);

    private SQSConnectionFactory sqsConnectionFactory;
    private SQSConnection sqsConnection;
    private Session sqsSession;

    public SQSConnectionManager() {
    }

    public void createSQSConnection(final String queueName) throws JMSException {

        LOGGER.info("Initializing sqs connection");
        sqsConnectionFactory = new SQSConnectionFactory(
            new ProviderConfiguration(),
            AmazonSQSClientBuilder.standard()
                                  .build()
        );

        sqsConnection = sqsConnectionFactory.createConnection();

        sqsSession = sqsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Queue queue = sqsSession.createQueue(queueName);

        MessageConsumer sqsConsumer = sqsSession.createConsumer(queue);

        sqsConsumer.setMessageListener(new MyCustomListener());

        sqsConnection.start();
        LOGGER.info("SQS Connection started");
    }
}

MyCustomListener.java

    public class MyCustomListener implements MessageListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomListener.class);

    public MyCustomListener() {}

    @Override
    public void onMessage(Message message) {
        try {
            LOGGER.info("onMessage() Thread name : {}", Thread.currentThread().getName());
            LOGGER.info("onMessage() Thread id : {}", Thread.currentThread().getId());
            LOGGER.info("Reading incoming sqs message");
            final SQSTextMessage sqsTextMessage = (SQSTextMessage) message;
            final String receivedMessage = sqsTextMessage.getText();
            LOGGER.info("Received sqs message : {}", receivedMessage);
            helper(receivedMessage);
        } catch (JMSException e) {
            LOGGER.error("Failed to read incoming sqs message : {}", e.getCause());
        }
    }

    private void helper(final String sqsMessage) {
        LOGGER.info("helper() Thread name : {}", Thread.currentThread().getName());
        LOGGER.info("helper() Thread id : {}", Thread.currentThread().getId());
        LOGGER.info("sqs message : {}", sqsMessage);
    }
}

Application.java

public class Application {

    private static final Logger LOGGER = LoggerFactory.getLogger(Application.class);
    
    public static void main(String[] args) throws Exception {
        SQSConnectionManager sqsConnectionManager = new SQSConnectionManager();
        sqsConnectionManager.createSQSConnection("test-queue");
    }
}

AWS Maven 依赖

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-sqs-java-messaging-lib</artifactId>
    <version>1.1.0</version>
</dependency>

这个 java 应用程序作为弹性 bean 茎应用程序部署到 aws。 当我检查云手表中的日志时,我看到

onMessage()
helper()

的线程 ID 相同

谁能帮我理解 JMS Listener 如何处理线程概念?它是否确保像在多线程中那样执行?

amazon-web-services multithreading jms amazon-sqs aws-sdk-java
© www.soinside.com 2019 - 2024. All rights reserved.