我正在尝试使用 Akka Streams 实现分页。目前我有
case class SomeObject(id:Long, next_page:Option[Map[String,String]])
def chainRequests(uri: Uri): Future[Option[(Uri, T)]] = {
if (uri.isEmpty) return Future.successful(None)
val response: Future[Response[T]] = sendWithRetry(prepareRequest(HttpMethods.GET, uri)).flatMap(unmarshal)
response.map { resp =>
resp.next_page match {
case Some(next_page) => Some(next_page("uri"), resp.data)
case _ => Some(Uri.Empty, resp.data)
}
}
}
Source.single(SomeObject).map(Uri(s"object/${_.id}")).map(uri => Source.unfoldAsync(url)(chainRequest)).map(...some processing goes here)
问题是,如果我执行 source.take(1000) 并且分页有很多元素(页面),那么下游在 Source.unfoldAsync 完成之前不会获取新元素。
我尝试在 Flows 中使用循环,例如
val in = builder.add(Flow[Uri])
val out = builder.add[Flow[T]]
val partition = b.add(Partition[Response[T]](2,r => r.next_page match {case Some(_)=>1; case None => 0}))
val merge = b.add(Merge[Response[T]],2)
in ~> mergeUri ~> sendRequest ~> partition
mergeUri.preferred <~ extractNextUri <~ partition.out(1)
partition.out(0) ~> Flow[Response[T]].map(_.data) ~> out
FlowShape(in.in, out.out)
但是上面的代码不起作用。
我一直在创建自己的 GraphStage。 UnfoldAsync 采用第一个元素,但使用 Flow 解决方案时,我没有“第一个”元素。有什么建议么?
谢谢
通过编写自己的 GraphStage 找到了解决方案
final class PaginationGraphStage[S <: Uri, E](f: S => Future[Option[(S, E)]])(implicit ec: ExecutionContextExecutor)
extends GraphStage[FlowShape[S, E]]{
val in: Inlet[S] = Inlet[S]("PaginationGraphStage.in")
val out: Outlet[E] = Outlet[E]("PaginationGraphStage.out")
override val shape: FlowShape[S, E] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler with InHandler {
private[this] var state: S = _
private[this] var inFlight = 0
private[this] var asyncFinished = false
private[this] def todo: Int = inFlight
def futureCompleted(result: Try[Option[(Uri, E)]]): Unit = {
inFlight -= 1
result match {
case Failure(ex) => fail(out, ex)
case Success(None) =>
asyncFinished = true
complete(out)
case Success(Some((newS: S, elem: E))) if !newS.isEmpty =>
push(out, elem)
state = newS
case Success(Some((newS: Uri, elem: E))) =>
push(out, elem)
asyncFinished = true
if (isAvailable(in)) getHandler(in).onPush()
else completeStage()
}
}
private val futureCB = getAsyncCallback(futureCompleted)
private val invokeFutureCB: Try[Option[(S, E)]] => Unit = futureCB.invoke
private def pullIfNeeded(): Unit = {
if (!hasBeenPulled(in)) tryPull(in)
}
override def onUpstreamFinish(): Unit = {
if (todo == 0) completeStage()
}
def onPull(): Unit = {
if (state != null) {
asyncFinished = false
inFlight += 1
val future = f(state)
future.value match {
case None => future.onComplete(invokeFutureCB)
case Some(v) => futureCompleted(v)
}
} else {
pullIfNeeded()
}
}
override def onPush(): Unit = {
if (state == null) {
inFlight += 1
state = grab(in)
pullIfNeeded()
getHandler(out).onPull()
}
if (asyncFinished) {
inFlight += 1
state = grab(in)
pullIfNeeded()
}
}
setHandlers(in, out, this)
}
}
Source.unfoldAsync
就是您正在寻找的。
我准备了一个简单的项目,遍历REST API的所有页面并累积所有页面的结果,用Seq返回Future。
可以在 GitHub 上找到可以运行的完整源代码和项目
class CatsHttpClientImpl(implicit system: ActorSystem[_], ec: ExecutionContext) extends CatsHttpClient {
private val logger: Logger = LoggerFactory.getLogger(classOf[CatsHttpClientImpl])
private val start: Option[String] = Some("https://catfact.ninja/breeds")
override def getAllBreads: Future[Seq[Cat]] = {
Source
.unfoldAsync(start) {
case Some(next) =>
val nextChunkFuture: Future[CatsResponse] = sendRequest(next)
nextChunkFuture.map { resp =>
resp.nextPageUrl match {
case Some(url) => Some((Some(url), resp.data))
case None => Some((None, resp.data))
}
}
case None => Future.successful(None)
}
.runWith(Sink.fold(Seq(): Seq[Cat])(_ ++ _))
}
private def sendRequest(url: String): Future[CatsResponse] = {
logger.info(s"CatsHttpClientImpl: Sending request $url")
val request = HttpRequest(
uri = Uri(url),
headers = List(
RawHeader("Accept", "application/json")
)
)
Http(system).singleRequest(request).flatMap { response =>
response.status match {
case StatusCodes.OK =>
logger.info("CatsHttpClientImpl: Received success")
Unmarshal(response.entity).to[CatsResponse]
case _ =>
logger.error("CatsHttpClientImpl: Received error")
throw new CatsHttpClientException()
}
}
}
}