我正在调用 REST URL 并尝试测量获取响应所需的时间。
我正在使用
DefaultHttpClient
来获取来自 REST URL
的响应。
在我的下面的程序中,每个线程都将在特定范围内工作。就像每个线程将在
1 - 100
之间工作,第二个线程将在 101 - 200
之间工作,等等。
所以在我的下面的代码中,第一次它工作得很好。但第二次,它在这一行抛出异常
httpclient.execute
第二次作为-
java.lang.IllegalStateException: Invalid use of BasicClientConnManager: connection still allocated.
Make sure to release the connection before allocating another one.
我在这里做错了什么吗?-
下面是我的代码-
class Task implements Runnable {
private DefaultHttpClient httpclient = new DefaultHttpClient();
private HttpGet httpGet;
private HttpResponse response;
@Override
public void run() {
try {
httpGet = new HttpGet(
"http://localhost:8080/service/BEService/v1/get/USERID=10000/profile.ACCOUNT.SERVICE
httpGet.getRequestLine();
for (int userId = id; userId < id + noOfTasks; userId++) {
long start = System.nanoTime();
response = httpclient.execute(httpGet);
long end = System.nanoTime() - start;
}
} catch (Exception e) {
LOG.error("Threw a Exception in " + getClass().getSimpleName(), e);
}
}
}
更新代码:-
如果我这样做-
class Task implements Runnable {
private DefaultHttpClient httpclient = new DefaultHttpClient();
private HttpGet httpGet;
private HttpResponse response;
@Override
public void run() {
try {
for (int userId = id; userId < id + noOfTasks; userId++) {
httpGet = new HttpGet("http://localhost:8080/service/BEService/v1/get/USERID=10000/profile.ACCOUNT.SERVICE");
httpGet.getRequestLine();
long start = System.nanoTime();
response = httpclient.execute(httpGet);
long end = System.nanoTime() - start;
HttpEntity entity = response.getEntity();
EntityUtils.consume(entity);
}
} catch (Exception e) {
LOG.error("Threw a Exception in " + getClass().getSimpleName(), e);
}
}
}
那好不好?
我在这里做错了什么吗?
是的。如文档中所述:
BasicClientConnectionManager 是一个简单的连接管理器 一次仅维护一个连接。尽管这个班级是 线程安全,它只能由一个执行线程使用。 BasicClientConnectionManager 将努力重用 具有相同路由的后续请求的连接。它会, 但是,关闭现有连接并为给定的连接重新打开它 路由,如果长连接的路由不匹配 连接请求的。如果连接已经建立 分配,然后抛出 java.lang.IllegalStateException。
BasicClientConnectionManager 默认由 HttpClient 使用。
假设您使用的是香草
DefaultHttpClient
(内部使用BasicClientConnectionManager
),您首先需要消耗未完成/最后的响应。
EntityUtils.consumeQuietly(httpResponse.getEntity());
否则,您可以每次重新分配
DefaultHttpClient
。
另外,您可以将
execute()
加 EntityUtils.consumeQuietly()
包装在 synchronized()
块内,以防止多线程错误。
这是我使用池连接管理器对 RestTemplate 的配置。它在 5 个以上的并发线程中运行得很好。
<!-- RestTemplate -->
<beans:bean id="restTemplateYT" class="org.springframework.web.client.RestTemplate">
<beans:constructor-arg ref="httpRequestFactoryYT" />
</beans:bean>
<beans:bean id="httpRequestFactoryYT" class="org.springframework.http.client.HttpComponentsClientHttpRequestFactory">
<beans:constructor-arg>
<beans:bean class="org.apache.http.impl.client.DefaultHttpClient">
<beans:constructor-arg>
<beans:bean class="org.apache.http.impl.conn.PoolingClientConnectionManager"/>
</beans:constructor-arg>
</beans:bean>
</beans:constructor-arg>
<beans:property name="connectTimeout" value="5000" />
</beans:bean>
春季版本:3.1.0
带有由 BasicHttpClientConnectionManager 工厂实现的 HttpClientConnectionManager 的 RestTemplate 可能会被 IllegalStateException 中断,并显示错误消息:“连接 [xxx.xxx.xxx.xxx] 仍已分配”。 相反,您需要使用 HttpComponentsClientHttpRequestFactory 实现。
附带了 Spring Boot 3.2 版本的代码示例,该示例在使用 BasicHttpClientConnectionManager 实现时重现了 IllegalStateException,并使用 HttpComponentsClientHttpRequestFactory 进行了修复。
import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.hc.core5.util.Timeout;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT, classes = RestTemplateConnectionManagerApplication.class)
class RestTemplateConnectionManagerApplicationTests {
private static final Logger logger = LoggerFactory.getLogger(RestTemplateConnectionManagerApplication.class);
private static final String localhost = "http://localhost:";
@Value("${spring.server.port}")
private Integer serverPort;
// @Autowired
// @Qualifier("restTemplateWithBasicPoolManager")
private RestTemplate restTemplate;
private static final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
@Test
void restTemplateWithBasicHttpClientConnectionManagerIllegalStateExceptionTest() {
initRestTemplateWithHttpClientConnectionManager();
assertThatThrownBy(() -> {
ScheduledFuture<List<Future<String>>> schedule = executorService.schedule(this::sendReq, 0, TimeUnit.SECONDS);
assertResponse(schedule);
}).hasCauseInstanceOf(IllegalStateException.class)
.hasMessageContaining("Connection [Not bound] is still allocated");
}
@Test
void restTemplateWithPoolingHttpClientConnectionManagerTest() throws ExecutionException, InterruptedException {
initRestTemplateWithPoolingHttpClientConnectionManager();
ScheduledFuture<List<Future<String>>> schedule = executorService.schedule(this::sendReq, 0, TimeUnit.SECONDS);
assertResponse(schedule);
}
@Test
void restTemplateWithDefaultPoolManagerTest() throws ExecutionException, InterruptedException {
initRestTemplateWithDefaultPoolManager();
ScheduledFuture<List<Future<String>>> schedule = executorService.schedule(this::sendReq, 0, TimeUnit.SECONDS);
assertResponse(schedule);
}
private void initRestTemplateWithDefaultPoolManager() {
restTemplate = new RestTemplateBuilder()
.setConnectTimeout(Duration.ofSeconds(2))
.setReadTimeout(Duration.ofSeconds(5))
.build();
}
private void initRestTemplateWithPoolingHttpClientConnectionManager() {
PoolingHttpClientConnectionManager connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
.setDefaultConnectionConfig(createConnectionConfig())
.setMaxConnTotal(100)
.setMaxConnPerRoute(20)
.build();
HttpClient httpClient = buildHttpClient(connectionManager);
initRestTemplateWithHttpClientConnectionManager(httpClient);
}
private void initRestTemplateWithHttpClientConnectionManager(HttpClient httpClient) {
ClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(httpClient);
restTemplate = new RestTemplate(requestFactory);
}
private void initRestTemplateWithHttpClientConnectionManager() {
BasicHttpClientConnectionManager connectionManager = new BasicHttpClientConnectionManager();
connectionManager.setConnectionConfig(createConnectionConfig());
HttpClient httpClient = buildHttpClient(connectionManager);
initRestTemplateWithHttpClientConnectionManager(httpClient);
}
private static HttpClient buildHttpClient(HttpClientConnectionManager connectionManager) {
return HttpClientBuilder
.create()
.setConnectionManager(connectionManager)
.build();
}
private ConnectionConfig createConnectionConfig() {
return ConnectionConfig.custom()
.setConnectTimeout(Timeout.of(Duration.ofMillis(0)))// timeout to get connection from pool
.setSocketTimeout(Timeout.of(Duration.ofSeconds(5))) // standard connection timeout
.build();
}
private List<Future<String>> sendReq() {
return IntStream.range(0, 100).mapToObj(index -> executorService.submit(() -> {
logger.info("sending request");
return restTemplate.getForObject(localhost + serverPort + "/", String.class);
})).collect(Collectors.toList());
}
private static void assertResponse(ScheduledFuture<List<Future<String>>> schedule) throws InterruptedException, ExecutionException {
for (Future<?> future : schedule.get()) {
assertThat(future.get()).isEqualTo("ok");
}
}
}
控制器:
@RestController
@RequestMapping("/")
public class ExampleController {
private static final Log logger = LogFactory.getLog(ExampleController.class);
@GetMapping()
public String restTemplateTimeOut() {
logger.info("ok");
try {
Thread.sleep(Duration.ofMillis(200).toMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "ok";
}
}
pom.xml:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>