在RabbitMQ中将HTTP请求/响应与异步消息集成在一起

问题描述 投票:0回答:1

我们的应用程序是一个消息处理系统,具有与RabbitMQ队列连接的多个组件。因此消息处理是异步的。现在,我需要添加一个与系统通信的HTTP适配器。由于HTTP与请求/响应同步,因此我需要一种连接同步流和异步流的方法。当前的解决方案是:

  1. HTTP请求被发送到一个队列。每个请求都有一个用于关联的唯一请求ID。
  2. HTTP请求被CompletableFuture阻止。
  3. 处理请求并将响应发送回另一个队列。
  4. 队列使用者使用响应来完成与请求ID的CompletableFuture匹配。

HTTP适配器是使用Akka HTTP实现的。使用具有功能handleWithAsyncHandler()Function<HttpRequest, CompletionStage<HttpResponse>>处理请求。

问题是HTTP适配器需要管理所有未决请求的映射(Map<String, CompletableFuture>)。对于每个请求,都会创建一个新的CompletableFuture对象并将其放入地图中。当队列中收到响应时,匹配的CompletableFuture完成以完成请求。代码中似乎有难闻的气味,因为我需要仔细管理此地图。例如,如果未能为请求生成响应,则需要从映射中删除该请求。

我想知道是否还有其他方法可以使用地图来跟踪所有待处理的请求。

java rabbitmq akka-http
1个回答
0
投票
需要考虑的事情是

请勿使用默认调度程序。

更好地定义阻塞调度程序来处理CompletableFuture.supplyAsync

例如

my-blocking-dispatcher { type = Dispatcher executor = "thread-pool-executor" thread-pool-executor { fixed-pool-size = 16 } throughput = 1 } import static akka.http.javadsl.server.Directives.completeWithFuture; import static akka.http.javadsl.server.Directives.post; // GOOD (the blocking is now isolated onto a dedicated dispatcher): final Route routes = post(() -> { final MessageDispatcher dispatcher = system.dispatchers().lookup("my-blocking-dispatcher"); return completeWithFuture(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5000L); } catch (InterruptedException e) { } return HttpResponse.create() .withEntity(Long.toString(System.currentTimeMillis())); }, dispatcher // uses the good "blocking dispatcher" that we // configured, instead of the default dispatcher to isolate the blocking. )); });

© www.soinside.com 2019 - 2024. All rights reserved.