java.lang.IllegalStateException:连接池关闭。如何处理多线程应用程序中的连接池异常?

问题描述 投票:0回答:1

我的应用程序正在创建多线程来读取来自 SQS 的消息


new Thread(() -> {
            while (true) {
                readMessages();
            }
        });

readMessage方法有这样的功能

 public void readMessages() {
       ........

       Object messages = new ArrayList();
       try {
          slingshotMessage = (new ReceiveMessageRequest()).withQueueUrl(this.queueUrl)
                            .withWaitTimeSeconds(this.subProps.getWaitTimeSeconds())
                          .withVisibilityTimeout(this.subProps.getVisibilityTimeoutSeconds())
                         .withMaxNumberOfMessages(this.subProps.getMaxNumberMessages());
           messages = this.sqs.receiveMessage(slingshotMessage).getMessages();
           
        } catch (Exception var6) {
            log.error("An error occurred while reading messages for subscriber: '" + this.subProps.getSubscriberName() + "' queueUrl: '" + this.queueUrl + "'", var6);
        }

}

我看到在 read message() 函数内的

this.sqs.receiveMessage
处引发了异常。我在这里捕获了异常,但是当抛出错误时,日志会堆积起来,并显示重复的异常:


java.lang.IllegalStateException: Connection pool shut down
    at org.apache.http.util.Asserts.check(Asserts.java:34) ~[httpcore-4.4.12.jar:4.4.12]
    at org.apache.http.pool.AbstractConnPool.lease(AbstractConnPool.java:196) ~[httpcore-4.4.12.jar:4.4.12]
    at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:268) ~[httpclient-4.5.10.jar:4.5.10]
    at jdk.internal.reflect.GeneratedMethodAccessor14.invoke(Unknown Source) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76) ~[aws-java-sdk-core-1.11.408.jar:na]
    at com.amazonaws.http.conn.$Proxy37.requestConnection(Unknown Source) ~[na:na]
    at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176) ~[httpclient-4.5.10.jar:4.5.10]
    at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) ~[httpclient-4.5.10.jar:4.5.10]
    at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[httpclient-4.5.10.jar:4.5.10]
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[httpclient-4.5.10.jar:4.5.10]
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) ~[httpclient-4.5.10.jar:4.5.10]
    at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72) ~[aws-java-sdk-core-1.11.408.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1256) ~[aws-java-sdk-core-1.11.408.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1072) ~[aws-java-sdk-core-1.11.408.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:745) ~[aws-java-sdk-core-1.11.408.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:719) ~[aws-java-sdk-core-1.11.408.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:701) ~[aws-java-sdk-core-1.11.408.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:669) ~[aws-java-sdk-core-1.11.408.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:651) ~[aws-java-sdk-core-1.11.408.jar:na]
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:515) ~[aws-java-sdk-core-1.11.408.jar:na]
    at com.amazonaws.services.sqs.AmazonSQSClient.doInvoke(AmazonSQSClient.java:2147) ~[aws-java-sdk-sqs-1.11.415.jar:na]
    at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2116) ~[aws-java-sdk-sqs-1.11.415.jar:na]
    at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2105) ~[aws-java-sdk-sqs-1.11.415.jar:na]
    at com.amazonaws.services.sqs.AmazonSQSClient.executeReceiveMessage(AmazonSQSClient.java:1559) ~[aws-java-sdk-sqs-1.11.415.jar:na]
    at com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage(AmazonSQSClient.java:1530) ~[aws-java-sdk-sqs-1.11.415.jar:na]
    at com.lmig.global.reuse.slingshot.subscriber.aws.AwsSubscriber.readMessages(AwsSubscriber.java:63) ~[slingshot-1.3.37-B3-RELEASE.jar:1.3.37-B3-RELEASE]
    at com.lmig.global.reuse.slingshot.subscriber.Subscriber.lambda$newSubThread$0(Subscriber.java:61) ~[slingshot-1.3.37-B3-RELEASE.jar:1.3.37-B3-RELEASE]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]```

所以我的问题是

  1. 有办法解决连接池异常吗?
  2. 由于这是在 newthread lambda 内部,因此它会继续创建线程,有没有一种方法可以让我以所需的堆栈跟踪默默退出,而不是在日志上堆积相同的错误。
java multithreading connection-pooling amazon-sqs
1个回答
0
投票

首先,不要为每条消息创建新线程,这是个坏主意。 创建一个线程,不断轮询您的 SQS,然后使用 Workstealing 线程池或其他一些池将检索到的数据发送到另一个线程以进行进一步处理。

这是伪代码,

public final class AsyncThreadUtils {

    private static final ExecutorService WORKER_SERVICE = Executors.newWorkStealingPool();

    private AsyncThreadUtils() {
    }

    public static void addTask(Runnable asyncThread) {
        WORKER_SERVICE.submit(asyncThread);
    }

    public static void stopAllThreads() throws InterruptedException {
        if (!WORKER_SERVICE.isShutdown() || !WORKER_SERVICE.isTerminated()) {
            WORKER_SERVICE.awaitTermination(0, TimeUnit.NANOSECONDS);
            WORKER_SERVICE.shutdownNow();
        }
    }
}        
class DataConsumer implements Runnable {
        while(true) {
            // Get the message from SQS
            // If data received then give it to another thread for processing
            AsyncThreadUtils.addTask(data -> { 
                   // Process goes here.
});
            // sleep for some time like 400ms
        }
    }
     
        // Initiate above thread during the bootup of your program.
        Thread sqsDataConsumer = new Thread(new DataConsumer())
       sqsDataConsumer.start();

在关闭java进程之前调用线程池的shutdown方法。

© www.soinside.com 2019 - 2024. All rights reserved.