通过akka流链接上下文

问题描述 投票:1回答:2

我正在将一些C#代码转换为scala和akka流。

我的C#代码看起来像这样:


Task<Result1> GetPartialResult1Async(Request request) ...
Task<Result2> GetPartialResult2Async(Request request) ...

async Task<Result> GetResultAsync(Request request) 
{
    var result1 = await GetPartialResult1Async(request);
    var result2 = await GetPartialResult2Async(request);
    return new Result(request, result1, result2);
}

现在为akka流。我没有从结果的RequestTask的功能,而是从请求到结果。

所以我已经有以下两个流程:

val partialResult1Flow: Flow[Request, Result1, NotUsed] = ...
val partialResult2Flow: Flow[Request, Result2, NotUsed] = ...

但是我看不到如何将它们组合成一个完整的流程,因为通过在第一个流程上调用,我们会丢失原始请求,而通过在第二个流程上调用,我们会失去第一个流程的结果。

所以我创建了一个WithState monad,它看起来像这样:

case class WithState[+TState, +TValue](value: TValue, state: TState) {
  def map[TResult](func: TValue => TResult): WithState[TState, TResult] = {
    WithState(func(value), state)
  }
  ... bunch more helper functions go here
}

然后,我将原始流程重写为如下形式:

def partialResult1Flow[TState]: Flow[WithState[TState, Request], WithState[TState, Result1]] = ...
def partialResult2Flow: Flow[WithState[TState, Request], WithState[TState, Result2]] = ...

并像这样使用它们:

val flow = Flow[Request]
    .map(x => WithState(x, x))
    .via(partialResult1Flow)
    .map(x => WithState(x.state, (x.state, x.value))
    .via(partialResult2Flow)
    .map(x => Result(x.state._1, x.state._2, x.value))

现在这有效,但是我当然不能保证将如何使用流量。所以我真的应该让它带有一个State参数:

def flow[TState] = Flow[WithState[TState, Request]]
    .map(x => WithState(x.value, (x.state, x.value)))
    .via(partialResult1Flow)
    .map(x => WithState(x.state._2, (x.state, x.value))
    .via(partialResult2Flow)
    .map(x => WithState(Result(x.state._1._2, x.state._2, x.value), x.state._1._1))

现在,我的代码变得非常难以阅读。我可以通过命名函数并使用案例类代替元组等来清理它。但是从根本上讲,这里有很多附带的复杂性,这是很难避免的。

我想念什么吗?这不是Akka流的好用例吗?有内置的方法吗?

scala akka akka-stream
2个回答
0
投票

免责声明,我对C#的async / await并不完全熟悉。

[根据我从C#文档的快速浏览中获得的信息,Task<T>是严格(即渴望,而不是懒惰)评估的计算,如果成功,最终将包含T。 Scala的等效项是Future[T],其中C#代码的等效项是:

import scala.concurrent.{ ExecutionContext, Future }

def getPartialResult1Async(req: Request): Future[Result1] = ???
def getPartialResult2Async(req: Request): Future[Result2] = ???

def getResultAsync(req: Request)(implicit ectx: ExecutionContext): Future[Result] = {
  val result1 = getPartialResult1Async(req)
  val result2 = getPartialResult2Async(req)
  result1.zipWith(result2) { tup => val (r1, r2) = tup
    new Result(req, r1, r2)
  }
  /* Could also:
   *   for {
   *     r1 <- result1
   *     r2 <- result2
   *    } yield { new Result(req, r1, r2) }
   *
   * Note that both the `result1.zipWith(result2)` and the above `for`
   * construction may compute the two partial results simultaneously.  If you
   * want to ensure that the second partial result is computed after the first 
   * partial result is successfully computed:
   *   for {
   *     r1 <- getPartialResult1Async(req)
   *     r2 <- getPartialResult2Async(req)
   *   } yield new Result(req, r1, r2)
   */
}

0
投票

我没有在问题中描述的任何根本不同的方式。

但是可以显着改善电流:

© www.soinside.com 2019 - 2024. All rights reserved.