我正在尝试使用Akka Streams同时向服务器发送请求,然后尝试将每个请求与原始上下文相关联(本例中为Int
)。这是我汇总的流程:
val createRequestFlow: Flow[(String, String), (HttpRequest, Int), _] = Flow.fromFunction[(String, String), (HttpRequest, Int)]((mkRequest _).tupled)
val sendRequestFlow: Flow[(HttpRequest, Int), (HttpResponse, Int), _] = Flow[(HttpRequest, Int)].mapAsyncUnordered(32)((sendRequest _).tupled)
val handleResponseFlow: Flow[(HttpResponse, Int), String, _] = Flow[(HttpResponse, Int)].map[String]((getStatusString _).tupled)
val handler = createRequestFlow via sendRequestFlow via handleResponseFlow
特别是,我正试图找到返回Future[(HttpResponse, Int)]
的方法。目前,我正在这样做
def sendRequest(request: HttpRequest, ctx: Int): Future[(HttpResponse, Int)] = {
Http().singleRequest(request).map(r => (r,ctx))
}
但我明白这需要一个Executor的事实表明还有另一种(更好的)方法。
我不认为有更好的方法。 Akka使用标准Scala Future
s,他们设计需要ExecutionContext
来执行几乎任何操作。如果你真的真的不想为这个简单的map
使用另一个线程,你可以创建自己的sameThreadExecutionContext
,类似于Akka在里面使用的那个(参见akka.dispatch.ExecutionContexts.sameThreadExecutionContext),所以map
将在处理主要Http响应的同一个线程上执行,但不要将它用于任何更复杂的事情(另见GitHub #19043的讨论)。