Akka Streams 递归流程调用

问题描述 投票:0回答:2

我正在尝试使用 Akka Streams 实现分页。目前我有

case class SomeObject(id:Long, next_page:Option[Map[String,String]])
def chainRequests(uri: Uri): Future[Option[(Uri, T)]] = {
    if (uri.isEmpty) return Future.successful(None)
    val response: Future[Response[T]] = sendWithRetry(prepareRequest(HttpMethods.GET, uri)).flatMap(unmarshal)
    response.map { resp =>
      resp.next_page match {
        case Some(next_page) => Some(next_page("uri"), resp.data)
        case _ => Some(Uri.Empty, resp.data)
      }
    }
  }
Source.single(SomeObject).map(Uri(s"object/${_.id}")).map(uri => Source.unfoldAsync(url)(chainRequest)).map(...some processing goes here)

问题是,如果我执行 source.take(1000) 并且分页有很多元素(页面),那么下游在 Source.unfoldAsync 完成之前不会获取新元素。

我尝试在 Flows 中使用循环,例如

val in = builder.add(Flow[Uri])
val out = builder.add[Flow[T]]

val partition = b.add(Partition[Response[T]](2,r => r.next_page match {case Some(_)=>1; case None => 0}))
val merge = b.add(Merge[Response[T]],2)
in ~> mergeUri ~> sendRequest ~> partition
      mergeUri.preferred <~ extractNextUri <~ partition.out(1)
      partition.out(0) ~> Flow[Response[T]].map(_.data) ~> out
FlowShape(in.in, out.out)

但是上面的代码不起作用。

我一直在创建自己的 GraphStage。 UnfoldAsync 采用第一个元素,但使用 Flow 解决方案时,我没有“第一个”元素。有什么建议么?

谢谢

scala akka akka-stream akka-http
2个回答
1
投票

通过编写自己的 GraphStage 找到了解决方案

final class PaginationGraphStage[S <: Uri, E](f: S => Future[Option[(S, E)]])(implicit ec: ExecutionContextExecutor)
  extends GraphStage[FlowShape[S, E]]{
  val in: Inlet[S] = Inlet[S]("PaginationGraphStage.in")
  val out: Outlet[E] = Outlet[E]("PaginationGraphStage.out")
  override val shape: FlowShape[S, E] = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with OutHandler with InHandler {
      private[this] var state: S = _
      private[this] var inFlight = 0
      private[this] var asyncFinished = false
      private[this] def todo: Int = inFlight

      def futureCompleted(result: Try[Option[(Uri, E)]]): Unit = {
        inFlight -= 1
        result match {
          case Failure(ex) => fail(out, ex)
          case Success(None) =>
            asyncFinished = true
            complete(out)
          case Success(Some((newS: S, elem: E))) if !newS.isEmpty =>
            push(out, elem)
            state = newS
          case Success(Some((newS: Uri, elem: E))) =>
            push(out, elem)
            asyncFinished = true
            if (isAvailable(in)) getHandler(in).onPush()
            else completeStage()
        }
      }

      private val futureCB = getAsyncCallback(futureCompleted)
      private val invokeFutureCB: Try[Option[(S, E)]] => Unit = futureCB.invoke

      private def pullIfNeeded(): Unit = {
        if (!hasBeenPulled(in)) tryPull(in)
      }
      override def onUpstreamFinish(): Unit = {
        if (todo == 0) completeStage()
      }

      def onPull(): Unit = {
        if (state != null) {
          asyncFinished = false
          inFlight += 1
          val future = f(state)
          future.value match {
            case None => future.onComplete(invokeFutureCB)
            case Some(v) => futureCompleted(v)
          }
        } else {
          pullIfNeeded()
        }
      }

      override def onPush(): Unit = {
        if (state == null) {
          inFlight += 1
          state = grab(in)
          pullIfNeeded()
          getHandler(out).onPull()
        }
        if (asyncFinished) {
          inFlight += 1
          state = grab(in)
          pullIfNeeded()
        }
      }
      setHandlers(in, out, this)
    }
}

0
投票

Source.unfoldAsync
就是您正在寻找的。

我准备了一个简单的项目,遍历REST API的所有页面并累积所有页面的结果,用Seq返回Future。

可以在 GitHub 上找到可以运行的完整源代码和项目

class CatsHttpClientImpl(implicit system: ActorSystem[_], ec: ExecutionContext) extends CatsHttpClient {
  private val logger: Logger = LoggerFactory.getLogger(classOf[CatsHttpClientImpl])
  private val start: Option[String] = Some("https://catfact.ninja/breeds")

  override def getAllBreads: Future[Seq[Cat]] = {
    Source
      .unfoldAsync(start) {
        case Some(next) =>
          val nextChunkFuture: Future[CatsResponse] = sendRequest(next)

          nextChunkFuture.map { resp =>
            resp.nextPageUrl match {
              case Some(url) => Some((Some(url), resp.data))
              case None => Some((None, resp.data))
            }
          }
        case None => Future.successful(None)
      }
      .runWith(Sink.fold(Seq(): Seq[Cat])(_ ++ _))
  }

  private def sendRequest(url: String): Future[CatsResponse] = {
    logger.info(s"CatsHttpClientImpl: Sending request $url")

    val request = HttpRequest(
      uri = Uri(url),
      headers = List(
        RawHeader("Accept", "application/json")
      )
    )
    Http(system).singleRequest(request).flatMap { response =>
      response.status match {
        case StatusCodes.OK =>
          logger.info("CatsHttpClientImpl: Received success")
          Unmarshal(response.entity).to[CatsResponse]

        case _ =>
          logger.error("CatsHttpClientImpl: Received error")
          throw new CatsHttpClientException()
      }
    }
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.