我需要创建一个REST端点,它将 "同步 "通过JMS工作的后端服务的请求和响应。换句话说,我的端点应该向JMS输入队列发送消息,在JMS输出队列中等待响应。如果在超时时间内没有响应,那么错误返回给消费者。而对于消费者来说,这个端点应该看起来像一个正常的同步请求响应。
目前我已经用java.util.concurrent.Exchanger实现了。我的代码(简化)。
REST端点
@RestController
public class Endpoint {
private ConcurrentMap<String, Exchanger> exchangers = new ConcurrentHashMap<>();
@GetMapping("/data/{requestId}")
public ResponseEntity<String> getData(@Parameter(in = ParameterIn.PATH, required = true) String requestId) {
Exchanger<String> syncExchanger = createAndPutIfNotExists(requestId);
sendToJMS(requestId);
int timeout = 30;
// wait for JMS response and return it
return waitForResponse(syncExchanger, requestId, timeout);
}
private synchronized Exchanger<String> createAndPutIfNotExists(String requestId) {
if (exchangers.get(requestId) != null) {
throw new BadHeaderException("Duplicate requestId");
}
Exchanger<String> exchanger = new Exchanger<>();
exchangers.put(requestId, exchanger);
return exchanger;
}
private String waitForResponse(Exchanger<String> exchanger, String requestId, int timeout) {
try {
return exchanger.exchange(null, timeout, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "interrupted";
} catch (TimeoutException e) {
throw new TimeoutException("timeout on waiting JMS response.", e);
} finally {
exchangers.remove(requestId);
}
}
@JmsListener(destination = "${jms.outputTopic}")
public void onMessage(Message m) throws JMSException {
String requestId = m.getStringProperty("RequestId");
String payload = m.getBody();
Exchanger<String> exchanger = exchangers.get(requestId );
if (exchanger != null) {
try {
exchanger.exchange(payload);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
exchangers.remove(requestId );
}
}
}
}
这个解决方案是可行的 但它 块 在等待响应的同时请求线程,然后webserver线程池在高负载时耗尽。
有什么办法可以做到不阻塞?
就像这样。
@GetMapping("/data/{requestId}")
public CompletableFuture<String> getData() {
return CompletableFuture.supplyAsync(() -> {
sendToJMS(requestId);
// How to wait for JMS response with some timeout ?
});
}
@JmsListener(destination = "${jms.outputTopic}")
public void onMessage(Message m) throws JMSException {
String requestId = m.getStringProperty("RequestId");
String payload = m.getBody();
// How to "complete" CompletableFuture ?
}
春天接受 CompletableFuture
作为控制器中的返回类型,所以你可以在 createAndPutIfNotExists()
并在 onMessage()
.
更换您的 exchangers
绘有 futures
地图。
private ConcurrentMap<String, CompletableFuture<String>> futures = new ConcurrentHashMap<>();
然后调整发送部分。
@GetMapping("/data/{requestId}")
public CompletableFuture<String> getData(@PathParam("requestId") String requestId) {
CompletableFuture<String> future = createAndPutIfNotExists(requestId);
sendToJMS(requestId);
int timeout = 30;
CompletableFuture<String> result = future.orTimeout(timeout, TimeUnit.SECONDS);
result.thenRun(() -> futures.remove(requestId, future));
return result;
}
private synchronized CompletableFuture<String> createAndPutIfNotExists(String requestId) {
if (futures.get(requestId) != null) {
throw new BadHeaderException("Duplicate requestId");
}
CompletableFuture<String> future = new CompletableFuture<>();
futures.put(requestId, future);
return future;
}
请注意,超时处理是使用Java 9的 orTimeout()
方法。如果你是在Java 8上,你将需要使用 自定义超时手环.
你可能还想做一些 thenApplyAsync(s -> s, executor)
技巧,将响应提交从JMS超时处理线程中移出。
最后,只要 complete()
在收到回复时,未来。
@JmsListener(destination = "${jms.outputTopic}")
public void onMessage(Message m) throws JMSException {
String requestId = m.getStringProperty("RequestId");
String payload = m.getBody();
CompletableFuture<String> future = futures.get(requestId);
if (future != null) {
try {
future.complete(payload);
} finally {
futures.remove(requestId, future);
}
}
}