同步异步后端

问题描述 投票:0回答:1

我需要创建一个REST端点,它将 "同步 "通过JMS工作的后端服务的请求和响应。换句话说,我的端点应该向JMS输入队列发送消息,在JMS输出队列中等待响应。如果在超时时间内没有响应,那么错误返回给消费者。而对于消费者来说,这个端点应该看起来像一个正常的同步请求响应。

目前我已经用java.util.concurrent.Exchanger实现了。我的代码(简化)。

REST端点

@RestController
public class Endpoint {

   private ConcurrentMap<String, Exchanger> exchangers = new ConcurrentHashMap<>();

   @GetMapping("/data/{requestId}")
   public ResponseEntity<String> getData(@Parameter(in = ParameterIn.PATH, required = true) String requestId) {
      Exchanger<String> syncExchanger = createAndPutIfNotExists(requestId);
      sendToJMS(requestId);
      int timeout = 30;
      // wait for JMS response and return it
      return waitForResponse(syncExchanger, requestId, timeout);
   }

   private synchronized Exchanger<String> createAndPutIfNotExists(String requestId) {
        if (exchangers.get(requestId) != null) {
            throw new BadHeaderException("Duplicate requestId");
        }
        Exchanger<String> exchanger = new Exchanger<>();
        exchangers.put(requestId, exchanger);
        return exchanger;
   }

   private String waitForResponse(Exchanger<String> exchanger, String requestId, int timeout) {
        try {
            return exchanger.exchange(null, timeout, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return "interrupted";
        } catch (TimeoutException e) {
            throw new TimeoutException("timeout on waiting JMS response.", e);
        } finally {
            exchangers.remove(requestId);
        }
   }

   @JmsListener(destination = "${jms.outputTopic}")
   public void onMessage(Message m) throws JMSException {
      String requestId = m.getStringProperty("RequestId");
      String payload = m.getBody();
      Exchanger<String> exchanger = exchangers.get(requestId );

      if (exchanger != null) {
            try {
                exchanger.exchange(payload);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                exchangers.remove(requestId );
            }
      }
   }
}

这个解决方案是可行的 但它 在等待响应的同时请求线程,然后webserver线程池在高负载时耗尽。

有什么办法可以做到不阻塞?

就像这样。

@GetMapping("/data/{requestId}")
   public CompletableFuture<String> getData() {
      return CompletableFuture.supplyAsync(() -> {
        sendToJMS(requestId);

        // How to wait for JMS response with some timeout ?

      });
   }

@JmsListener(destination = "${jms.outputTopic}")
   public void onMessage(Message m) throws JMSException {
      String requestId = m.getStringProperty("RequestId");
      String payload = m.getBody();

      // How to "complete" CompletableFuture ?

   }

java synchronization jms nonblocking completable-future
1个回答
0
投票

春天接受 CompletableFuture 作为控制器中的返回类型,所以你可以在 createAndPutIfNotExists() 并在 onMessage().

更换您的 exchangers 绘有 futures 地图。

private ConcurrentMap<String, CompletableFuture<String>> futures = new ConcurrentHashMap<>();

然后调整发送部分。

@GetMapping("/data/{requestId}")
public CompletableFuture<String> getData(@PathParam("requestId") String requestId) {
    CompletableFuture<String> future = createAndPutIfNotExists(requestId);
    sendToJMS(requestId);
    int timeout = 30;
    CompletableFuture<String> result = future.orTimeout(timeout, TimeUnit.SECONDS);
    result.thenRun(() -> futures.remove(requestId, future));
    return result;
}

private synchronized CompletableFuture<String> createAndPutIfNotExists(String requestId) {
    if (futures.get(requestId) != null) {
        throw new BadHeaderException("Duplicate requestId");
    }
    CompletableFuture<String> future = new CompletableFuture<>();
    futures.put(requestId, future);
    return future;
}

请注意,超时处理是使用Java 9的 orTimeout() 方法。如果你是在Java 8上,你将需要使用 自定义超时手环.

你可能还想做一些 thenApplyAsync(s -> s, executor) 技巧,将响应提交从JMS超时处理线程中移出。

最后,只要 complete() 在收到回复时,未来。

@JmsListener(destination = "${jms.outputTopic}")
public void onMessage(Message m) throws JMSException {
    String requestId = m.getStringProperty("RequestId");
    String payload = m.getBody();
    CompletableFuture<String> future = futures.get(requestId);

    if (future != null) {
        try {
            future.complete(payload);
        } finally {
            futures.remove(requestId, future);
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.