Akka Streams - 用于Source.unfoldAsync的Backpressure

问题描述 投票:3回答:2

我目前正在尝试读取分页的HTTP资源。每个页面都是一个多部分文档,如果页面中包含更多内容,则页面的响应在标题中包含next链接。然后,自动解析器可以从最旧的页面开始,然后使用标题逐页读取以构建对下一页的请求。

我正在使用Akka Streams和Akka Http来实现,因为我的目标是创建流式解决方案。我想出了这个(我将在这里仅包含代码的相关部分,随意查看整个代码的this gist):

def read(request: HttpRequest): Source[HttpResponse, _] =
  Source.unfoldAsync[Option[HttpRequest], HttpResponse](Some(request))(Crawl.crawl)

val parse: Flow[HttpResponse, General.BodyPart, _] = Flow[HttpResponse]
  .flatMapConcat(r => Source.fromFuture(Unmarshal(r).to[Multipart.General]))
  .flatMapConcat(_.parts)

....

def crawl(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] = reqOption match {
  case Some(req) =>
    Http().singleRequest(req).map { response =>
      if (response.status.isFailure()) Some((None, response))
      else nextRequest(response, HttpMethods.GET)
    }
  case None => Future.successful(None)
}

所以一般的想法是使用Source.unfoldAsync来浏览页面并执行HTTP请求(这个想法和实现非常接近this answer中描述的内容。这将创建一个可以被消耗的Source[HttpResponse, _](Unmarshal to Multipart,split up进入各个部分,...)。

我现在的问题是HttpResponses的消耗可能需要一段时间(如果页面很大,则解组需要一些时间,最后可能会有一些数据库请求来保留一些数据,......)。所以如果下游速度较慢,我希望Source.unfoldAsync能够反压。默认情况下,下一个HTTP请求将在上一个HTTP请求完成后立即启动。

所以我的问题是:是否有某种方法可以使Source.unfoldAsync在缓慢下游的背压?如果没有,是否有替代方案可以使加压成为可能?

我可以设想一个利用akka-http提供的主机级客户端API的解决方案,如here和循环图所描述的那样,第一个请求的响应将用作生成第二个请求的输入,但是我尚未尝试过,我不确定这是否可行。


编辑:经过几天的游戏和阅读文档和一些博客,我不确定我是否在正确的轨道上我假设Source.unfoldAsync的背压行为是根本原因。要添加更多观察结果:

  • 当流启动时,我看到几个请求出去了。首先,这是没问题的,只要及时消耗得到的HttpResponse(参见here的描述)
  • 如果我不更改默认的response-entity-subscription-timeout,我将遇到以下错误(我删除了URL): [WARN] [03/30/2019 13:44:58.984] [default-akka.actor.default-dispatcher-16] [default/Pool(shared->http://....)] [1 (WaitingForResponseEntitySubscription)] Response entity was not subscribed after 1 seconds. Make sure to read the response entity body or call discardBytes() on it. GET ... Empty -> 200 OK Chunked 这导致IllegalStateException终止流:java.lang.IllegalStateException: Substream Source cannot be materialized more than once
  • 我观察到响应的解组是流中最慢的部分,这可能是有意义的,因为响应主体是一个Multipart文档,因此相对较大。但是,我希望流的这一部分能够向上游发出较少的需求(在我的情况下是Source.unfoldAsync部分)。这应该导致提出的请求减少。
  • 一些谷歌搜索引导我到a discussion about an issue that seems to describe a similar problem。他们还讨论了响应处理速度不够快时出现的问题。 associated merge request将带来文档更改,建议在继续使用流之前完全使用HttpResponse。在讨论这个问题时,人们对whether or not it's a good idea to combine Akka Http with Akka Streams也有疑问。所以也许我必须改变实现来直接在unfoldAsync调用的函数内部进行解组。
scala akka akka-stream akka-http
2个回答
1
投票

根据implementationSource.unfoldAsync,传入函数仅在拉动源时调用:

def onPull(): Unit = f(state).onComplete(asyncHandler)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)

因此,如果下游没有拉动(背压),则不会调用传入源的函数。

在你的要点中,你使用runForeach(与runWith(Sink.foreach)相同),一旦println完成就会拉上游。所以这里很难注意到背压。

尝试将您的示例更改为runWith(Sink.queue),它将为您提供SinkQueueWithCancel作为具体化值。然后,除非您在队列中调用pull,否则该流将被反压并且不会发出请求。

请注意,在背压传播通过所有流之前,可能会有一个或多个初始请求。


0
投票

我想我明白了。正如我在编辑问题时已经提到的,我在Akka HTTP中找到了this comment的一个问题,作者说:

...将Akka http混合到更大的处理流中并不是最佳做法。相反,您需要围绕流的Akka http部分的边界,以确保它们在允许外部处理流继续之前始终消耗其响应。

所以我继续尝试了:我没有在流的不同阶段进行HTTP请求和解组,而是通过flatMaping Future[HttpResponse]直接解组响应到Future[Multipart.General]。这可以确保直接消耗HttpResponse并避免Response entity was not subscribed after 1 second错误。 crawl函数现在看起来略有不同,因为它必须返回unmarshalled Multipart.General对象(用于进一步处理)以及原始HttpResponse(以便能够构建标题中的下一个请求):

def crawl(reqOption: Option[HttpRequest])(implicit actorSystem: ActorSystem, materializer: Materializer, executionContext: ExecutionContext): Future[Option[(Option[HttpRequest], (HttpResponse, Multipart.General))]] = {
  reqOption match {
    case Some(request) =>
      Http().singleRequest(request)
        .flatMap(response => Unmarshal(response).to[Multipart.General].map(multipart => (response, multipart)))
        .map {
          case tuple@(response, multipart) =>
            if (response.status.isFailure()) Some((None, tuple))
            else nextRequest(response, HttpMethods.GET).map { case (req, res) => (req, (res, multipart)) }
        }
    case None => Future.successful(None)
  }
}

其余的代码必须因此而改变。我创建了another gist,其中包含与原始问题中的gist相同的代码。

我期待两个Akka项目更好地集成(文档目前没有提到这个限制,而HTTP API似乎鼓励用户一起使用Akka HTTP和Akka Streams),所以这感觉有点像解决方法,但它现在解决了我的问题。我仍然需要弄清楚在将这个部分集成到我的更大用例中时遇到的一些其他问题,但这不是这个问题的一部分。

© www.soinside.com 2019 - 2024. All rights reserved.