我们的应用程序是一个消息处理系统,具有与RabbitMQ队列连接的多个组件。因此消息处理是异步的。现在,我需要添加一个与系统通信的HTTP适配器。由于HTTP与请求/响应同步,因此我需要一种连接同步流和异步流的方法。当前的解决方案是:
CompletableFuture
阻止。CompletableFuture
匹配。HTTP适配器是使用Akka HTTP实现的。使用具有功能handleWithAsyncHandler()
的Function<HttpRequest, CompletionStage<HttpResponse>>
处理请求。
问题是HTTP适配器需要管理所有未决请求的映射(Map<String, CompletableFuture>
)。对于每个请求,都会创建一个新的CompletableFuture
对象并将其放入地图中。当队列中收到响应时,匹配的CompletableFuture
完成以完成请求。代码中似乎有难闻的气味,因为我需要仔细管理此地图。例如,如果未能为请求生成响应,则需要从映射中删除该请求。
我想知道是否还有其他方法可以使用地图来跟踪所有待处理的请求。
请勿使用默认调度程序。
更好地定义阻塞调度程序来处理CompletableFuture.supplyAsync
例如my-blocking-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 16
}
throughput = 1
}
import static akka.http.javadsl.server.Directives.completeWithFuture;
import static akka.http.javadsl.server.Directives.post;
// GOOD (the blocking is now isolated onto a dedicated dispatcher):
final Route routes = post(() -> {
final MessageDispatcher dispatcher = system.dispatchers().lookup("my-blocking-dispatcher");
return completeWithFuture(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
}
return HttpResponse.create()
.withEntity(Long.toString(System.currentTimeMillis()));
}, dispatcher // uses the good "blocking dispatcher" that we
// configured, instead of the default dispatcher to isolate the blocking.
));
});