在流中链接 Akka-http-client 请求

问题描述 投票:0回答:2

我想使用 akka-http-client 作为 Stream 来链接 http 请求。链中的每个 http 请求都取决于先前请求的成功/响应,并使用它来构造新请求。如果请求不成功,Stream 应返回不成功请求的响应。

如何在akka-http中构建这样的流? 我应该使用哪个 akka-http 客户端级别 API?

scala akka-stream akka-http
2个回答
14
投票

如果您正在制作网络爬虫,请查看这篇文章。这个答案解决了一个更简单的情况,例如下载分页资源,其中下一页的链接位于当前页面响应的标题中。

您可以使用

Source.unfoldAsync
方法创建一个链式源 - 一个项目指向下一个项目。这需要一个函数,该函数接受一个元素
S
并返回
Future[Option[(S, E)]]
来确定流是否应该继续发出
E
类型的元素,并将状态传递给下一个调用。

就你而言,这有点像:

  1. 取得初始
    HttpRequest
  2. 产生一个
    Future[HttpResponse]
  3. 如果响应指向另一个 URL,则返回
    Some(request -> response)
    ,否则
    None

但是,有一个问题,如果流不包含指向下一个请求的指针,则不会从流中发出响应。

要解决此问题,您可以使传递给

unfoldAsync
的函数返回
Future[Option[(Option[HttpRequest], HttpResponse)]]
。这使您可以处理以下情况:

  • 当前响应有误
  • 当前响应指向另一个请求
  • 当前响应并不指向另一个请求

接下来是一些带注释的代码,概述了这种方法,但首先是初步的:

当将HTTP请求流式传输到Akka流中的响应时,您需要确保响应主体被消耗,否则会发生不好的事情(死锁等)。如果您不需要主体,您可以忽略它,但在这里我们使用函数将

HttpEntity
从(潜在的)流转换为严格实体:

import scala.concurrent.duration._

def convertToStrict(r: HttpResponse): Future[HttpResponse] =
  r.entity.toStrict(10.minutes).map(e => r.withEntity(e))

接下来,有几个函数可以从

Option[HttpRequest]
创建
HttpResponse
。此示例使用类似 Github 分页链接的方案,其中
Links
标头包含,例如:
<https://api.github.com/...> rel="next"
:

def nextUri(r: HttpResponse): Seq[Uri] = for {
  linkHeader <- r.header[Link].toSeq
  value <- linkHeader.values
  params <- value.params if params.key == "rel" && params.value() == "next"
} yield value.uri

def getNextRequest(r: HttpResponse): Option[HttpRequest] =
  nextUri(r).headOption.map(next => HttpRequest(HttpMethods.GET, next))

接下来,我们将传递给

unfoldAsync
的实际函数。它使用 Akka HTTP
Http().singleRequest()
API 获取
HttpRequest
并生成
Future[HttpResponse]
:

def chainRequests(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] =
  reqOption match {
    case Some(req) => Http().singleRequest(req).flatMap { response =>
      // handle the error case. Here we just return the errored response
      // with no next item.
      if (response.status.isFailure()) Future.successful(Some(None -> response))

      // Otherwise, convert the response to a strict response by
      // taking up the body and looking for a next request.
      else convertToStrict(response).map { strictResponse =>
        getNextRequest(strictResponse) match {
          // If we have no next request, return Some containing an
          // empty state, but the current value
          case None => Some(None -> strictResponse)

          // Otherwise, pass on the request...
          case next => Some(next -> strictResponse)
        }
      }
    }
    // Finally, there's no next request, end the stream by
    // returning none as the state.
    case None => Future.successful(None)
  }

请注意,如果我们收到错误响应,则流将不会继续,因为我们在下一个状态返回

None

您可以调用它来获取

HttpResponse
对象流,如下所示:

val initialRequest = HttpRequest(HttpMethods.GET, "http://www.my-url.com")
Source.unfoldAsync[Option[HttpRequest], HttpResponse](
    Some(initialRequest)(chainRequests)

至于返回最后一个(或错误的)响应的值,您只需使用

Sink.last
,因为流将在成功完成或第一个错误响应时结束。例如:

def getStatus: Future[StatusCode] = Source.unfoldAsync[Option[HttpRequest], HttpResponse](
      Some(initialRequest))(chainRequests)
    .map(_.status)
    .runWith(Sink.last)

0
投票

您可以使用

Source.unfoldAsync

class CatsHttpClientImpl(implicit system: ActorSystem[_], ec: ExecutionContext) extends CatsHttpClient {
  private val logger: Logger = LoggerFactory.getLogger(classOf[CatsHttpClientImpl])
  private val start: Option[String] = Some("https://catfact.ninja/breeds")

  override def getAllBreads: Future[Seq[Cat]] = {
    Source
      .unfoldAsync(start) {
        case Some(next) =>
          val nextChunkFuture: Future[CatsResponse] = sendRequest(next)

          nextChunkFuture.map { resp =>
            resp.nextPageUrl match {
              case Some(url) => Some((Some(url), resp.data))
              case None => Some((None, resp.data))
            }
          }
        case None => Future.successful(None)
      }
      .runWith(Sink.fold(Seq(): Seq[Cat])(_ ++ _))
  }

  private def sendRequest(url: String): Future[CatsResponse] = {
    logger.info(s"CatsHttpClientImpl: Sending request $url")

    val request = HttpRequest(
      uri = Uri(url),
      headers = List(
        RawHeader("Accept", "application/json")
      )
    )
    Http(system).singleRequest(request).flatMap { response =>
      response.status match {
        case StatusCodes.OK =>
          logger.info("CatsHttpClientImpl: Received success")
          Unmarshal(response.entity).to[CatsResponse]

        case _ =>
          logger.error("CatsHttpClientImpl: Received error")
          throw new CatsHttpClientException()
      }
    }
  }
} 

完整的源代码和可运行的项目可以在 GitHub 上找到

© www.soinside.com 2019 - 2024. All rights reserved.