Akka流延迟和背压

问题描述 投票:0回答:1

[在使用Akka Streams时面临缓冲和后推压力问题。

我有以下代码(为简化问题而进行了简化):

object X extends App {
  implicit val actorSysten = ActorSystem("Actory-System")
  implicit val executionContext = actorSysten.dispatcher

  def partition2(v : Int): Int = {
    v % 2
  }

  val partialGraphDSL = GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._
    val partitioner = builder.add(Partition[Int](2, partition2))
    val mergeLatest = builder.add(MergeLatest[Int](2))

    partitioner.out(0) ~> mergeLatest
    partitioner.out(1) ~> mergeLatest

    FlowShape(partitioner.in, mergeLatest.out)
  }

  val source = Source(1 to 1000)
     .async

  val mainGraph = GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._
    val partialGraph = builder.add(partialGraphDSL)

    val delay = Flow[List[Int]]
     .delay(1.seconds, OverflowStrategy.backpressure)
     .map { x => println("After delay."); x }
     .withAttributes(Attributes.inputBuffer(1,1))

     source ~> Flow[Int].map { x => println(x); x } ~> partialGraph ~> delay ~> Sink.foreach(println)

     ClosedShape
   }

   val runnable = RunnableGraph.fromGraph(mainGraph)
   val materialized = runnable.run()
 }

这将立即从1到1000输出,然后每秒将打印“ After Delay”。

1
2
...
999
1000
After delay.
List(2, 1)
After delay.
List(2, 3)

我原本期望延迟流会对源造成压力。但是,如果我修改代码以不再使用partialGraphDSL,则输出正是我所期望的:

object X extends App {
  implicit val actorSysten = ActorSystem("Actory-System")
  implicit val executionContext = actorSysten.dispatcher

  val source = Source(1 to 1000)
     .async

  val mainGraph = GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    val delay = Flow[Int]
      .delay(1.seconds, OverflowStrategy.backpressure)
      .map { x => println("After delay."); x }
      .withAttributes(Attributes.inputBuffer(1,1))

    source ~> Flow[Int].map { x => println(x); x } ~> delay ~> Sink.foreach(println)

    ClosedShape
  }

  val runnable = RunnableGraph.fromGraph(mainGraph)
  val materialized = runnable.run()
}

输出:

1
After delay.
1
2
After delay.
...

似乎背压没有从延迟通过“ partialGraphDSL”传播到源。为什么会这样?

scala akka akka-stream
1个回答
0
投票

该代码的行为之所以如此,是因为它将数据存储在内部缓冲区中。因此,它不会产生反压力。

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Attributes, ClosedShape, FlowShape, OverflowStrategy}
import akka.stream.scaladsl.{Flow, GraphDSL, MergeLatest, Partition, RunnableGraph, Sink, Source}

import scala.concurrent.duration._

object GraphWithBackPressure extends App {
  implicit val actorSysten = ActorSystem("Actory-System")
  implicit val executionContext = actorSysten.dispatcher

  def partition2(v : Int): Int = {
    v % 2
  }

  val partialGraphDSL = GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._
    val partitioner = builder.add(Partition[Int](2, partition2))
    val mergeLatest = builder.add(MergeLatest[Int](2))

    partitioner.out(0) ~> mergeLatest
    partitioner.out(1) ~> mergeLatest

    FlowShape(partitioner.in, mergeLatest.out)
  }

  val source = Source(1 to 1000000)
    .async

  val mainGraph = GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._
    val partialGraph = builder.add(partialGraphDSL)

    val delay = Flow[List[Int]]
      .delay(1.seconds, OverflowStrategy.backpressure)
      .map { x => println("After delay."); x }
      .withAttributes(Attributes.inputBuffer(1,1))

    val newDelay = Flow[Int]
      .delay(1.seconds, OverflowStrategy.backpressure)
      .map { x => println("After delay."); x }
      .withAttributes(Attributes.inputBuffer(1,1))

    source ~> newDelay ~> Flow[Int].map { x => println(x); x } ~> partialGraph ~> delay ~> Sink.foreach(println)

    ClosedShape
  }

  val runnable = RunnableGraph.fromGraph(mainGraph)
  val materialized = runnable.run()(ActorMaterializer())
}

如果我们在Flow之前开始延迟,那么我们可以看到背压。

NOTE:检查newDelay方法

© www.soinside.com 2019 - 2024. All rights reserved.