我正在将一些C#代码转换为scala和akka流。
我的C#代码看起来像这样:
Task<Result1> GetPartialResult1Async(Request request) ...
Task<Result2> GetPartialResult2Async(Request request) ...
async Task<Result> GetResultAsync(Request request)
{
var result1 = await GetPartialResult1Async(request);
var result2 = await GetPartialResult2Async(request);
return new Result(request, result1, result2);
}
现在为akka流。我没有从结果的Request
到Task
的功能,而是从请求到结果。
所以我已经有以下两个流程:
val partialResult1Flow: Flow[Request, Result1, NotUsed] = ...
val partialResult2Flow: Flow[Request, Result2, NotUsed] = ...
但是我看不到如何将它们组合成一个完整的流程,因为通过在第一个流程上调用,我们会丢失原始请求,而通过在第二个流程上调用,我们会失去第一个流程的结果。
所以我创建了一个WithState monad,它看起来像这样:
case class WithState[+TState, +TValue](value: TValue, state: TState) {
def map[TResult](func: TValue => TResult): WithState[TState, TResult] = {
WithState(func(value), state)
}
... bunch more helper functions go here
}
然后,我将原始流程重写为如下形式:
def partialResult1Flow[TState]: Flow[WithState[TState, Request], WithState[TState, Result1]] = ...
def partialResult2Flow: Flow[WithState[TState, Request], WithState[TState, Result2]] = ...
并像这样使用它们:
val flow = Flow[Request]
.map(x => WithState(x, x))
.via(partialResult1Flow)
.map(x => WithState(x.state, (x.state, x.value))
.via(partialResult2Flow)
.map(x => Result(x.state._1, x.state._2, x.value))
现在这有效,但是我当然不能保证将如何使用流量。所以我真的应该让它带有一个State参数:
def flow[TState] = Flow[WithState[TState, Request]]
.map(x => WithState(x.value, (x.state, x.value)))
.via(partialResult1Flow)
.map(x => WithState(x.state._2, (x.state, x.value))
.via(partialResult2Flow)
.map(x => WithState(Result(x.state._1._2, x.state._2, x.value), x.state._1._1))
现在,我的代码变得非常难以阅读。我可以通过命名函数并使用案例类代替元组等来清理它。但是从根本上讲,这里有很多附带的复杂性,这是很难避免的。
我想念什么吗?这不是Akka流的好用例吗?有内置的方法吗?
免责声明,我对C#的async / await并不完全熟悉。
[根据我从C#文档的快速浏览中获得的信息,Task<T>
是严格(即渴望,而不是懒惰)评估的计算,如果成功,最终将包含T
。 Scala的等效项是Future[T]
,其中C#代码的等效项是:
import scala.concurrent.{ ExecutionContext, Future }
def getPartialResult1Async(req: Request): Future[Result1] = ???
def getPartialResult2Async(req: Request): Future[Result2] = ???
def getResultAsync(req: Request)(implicit ectx: ExecutionContext): Future[Result] = {
val result1 = getPartialResult1Async(req)
val result2 = getPartialResult2Async(req)
result1.zipWith(result2) { tup => val (r1, r2) = tup
new Result(req, r1, r2)
}
/* Could also:
* for {
* r1 <- result1
* r2 <- result2
* } yield { new Result(req, r1, r2) }
*
* Note that both the `result1.zipWith(result2)` and the above `for`
* construction may compute the two partial results simultaneously. If you
* want to ensure that the second partial result is computed after the first
* partial result is successfully computed:
* for {
* r1 <- getPartialResult1Async(req)
* r2 <- getPartialResult2Async(req)
* } yield new Result(req, r1, r2)
*/
}
我没有在问题中描述的任何根本不同的方式。
但是可以显着改善电流: