改变Akka流的未来

问题描述 投票:0回答:1

我正在尝试使用Akka Streams同时向服务器发送请求,然后尝试将每个请求与原始上下文相关联(本例中为Int)。这是我汇总的流程:

val createRequestFlow: Flow[(String, String), (HttpRequest, Int), _] = Flow.fromFunction[(String, String), (HttpRequest, Int)]((mkRequest _).tupled)
val sendRequestFlow: Flow[(HttpRequest, Int), (HttpResponse, Int), _] = Flow[(HttpRequest, Int)].mapAsyncUnordered(32)((sendRequest _).tupled)
val handleResponseFlow: Flow[(HttpResponse, Int), String, _] = Flow[(HttpResponse, Int)].map[String]((getStatusString _).tupled)

val handler =  createRequestFlow via sendRequestFlow via handleResponseFlow

特别是,我正试图找到返回Future[(HttpResponse, Int)]的方法。目前,我正在这样做

def sendRequest(request: HttpRequest, ctx: Int): Future[(HttpResponse, Int)] = {
    Http().singleRequest(request).map(r => (r,ctx))
  }

但我明白这需要一个Executor的事实表明还有另一种(更好的)方法。

scala akka akka-stream
1个回答
1
投票

我不认为有更好的方法。 Akka使用标准Scala Futures,他们设计需要ExecutionContext来执行几乎任何操作。如果你真的真的不想为这个简单的map使用另一个线程,你可以创建自己的sameThreadExecutionContext,类似于Akka在里面使用的那个(参见akka.dispatch.ExecutionContexts.sameThreadExecutionContext),所以map将在处理主要Http响应的同一个线程上执行,但不要将它用于任何更复杂的事情(另见GitHub #19043的讨论)。

© www.soinside.com 2019 - 2024. All rights reserved.