Akka HTTP错误响应实体在1秒后未被订阅。

问题描述 投票:0回答:1

我正在使用Akka HTTP cachedHostConnectionPoolHttps池来发送请求,作为Akka Streams Flow的一部分。

  private val requestFlow: Flow[(HttpRequest, HelperClass), Either[Error, String], _] =
Http().cachedHostConnectionPoolHttps(BaseUrl).mapAsync(1) {
  case (Success(HttpResponse(_, _, entity, _)), _) =>
    Unmarshal(entity).to[String].map(response => {
      Right(response)
    })
  case (Failure(ex), _) =>
    Future(Left(Error(ex)))
}

由于某些原因,并非所有的请求响应都被处理。有些结果是错误的。

a.h.i.e.c.PoolGateway - [0 (WaitingForResponseEntitySubscription)] Response entity was not subscribed after 1 second. Make sure to read the response entity body or call `discardBytes()` on it.

如何在保持上述流程的同时,订阅我的响应?

http request akka subscribe
1个回答
0
投票

按照文档中的建议,用下面的方式实现实体处理就可以解决这个问题。

          private val requestFlow: Flow[(HttpRequest, HelperClass), Either[Error, String], _] =
Http().cachedHostConnectionPoolHttps(BaseUrl).mapAsync(1) {
  case (Success(HttpResponse(_, _, entity, _)),    _) =>
      entity.dataBytes
        .runReduce(_ ++ _)
        .map(r => Right(r.toString))
  case (Failure(ex), _) =>
    Future(Left(Error(ex)))
}
© www.soinside.com 2019 - 2024. All rights reserved.