[在使用Akka Streams时面临缓冲和后推压力问题。
我有以下代码(为简化问题而进行了简化):
object X extends App {
implicit val actorSysten = ActorSystem("Actory-System")
implicit val executionContext = actorSysten.dispatcher
def partition2(v : Int): Int = {
v % 2
}
val partialGraphDSL = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val partitioner = builder.add(Partition[Int](2, partition2))
val mergeLatest = builder.add(MergeLatest[Int](2))
partitioner.out(0) ~> mergeLatest
partitioner.out(1) ~> mergeLatest
FlowShape(partitioner.in, mergeLatest.out)
}
val source = Source(1 to 1000)
.async
val mainGraph = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val partialGraph = builder.add(partialGraphDSL)
val delay = Flow[List[Int]]
.delay(1.seconds, OverflowStrategy.backpressure)
.map { x => println("After delay."); x }
.withAttributes(Attributes.inputBuffer(1,1))
source ~> Flow[Int].map { x => println(x); x } ~> partialGraph ~> delay ~> Sink.foreach(println)
ClosedShape
}
val runnable = RunnableGraph.fromGraph(mainGraph)
val materialized = runnable.run()
}
这将立即从1到1000输出,然后每秒将打印“ After Delay”。
1
2
...
999
1000
After delay.
List(2, 1)
After delay.
List(2, 3)
我原本期望延迟流会对源造成压力。但是,如果我修改代码以不再使用partialGraphDSL,则输出正是我所期望的:
object X extends App {
implicit val actorSysten = ActorSystem("Actory-System")
implicit val executionContext = actorSysten.dispatcher
val source = Source(1 to 1000)
.async
val mainGraph = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val delay = Flow[Int]
.delay(1.seconds, OverflowStrategy.backpressure)
.map { x => println("After delay."); x }
.withAttributes(Attributes.inputBuffer(1,1))
source ~> Flow[Int].map { x => println(x); x } ~> delay ~> Sink.foreach(println)
ClosedShape
}
val runnable = RunnableGraph.fromGraph(mainGraph)
val materialized = runnable.run()
}
输出:
1
After delay.
1
2
After delay.
...
似乎背压没有从延迟通过“ partialGraphDSL”传播到源。为什么会这样?
该代码的行为之所以如此,是因为它将数据存储在内部缓冲区中。因此,它不会产生反压力。
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Attributes, ClosedShape, FlowShape, OverflowStrategy}
import akka.stream.scaladsl.{Flow, GraphDSL, MergeLatest, Partition, RunnableGraph, Sink, Source}
import scala.concurrent.duration._
object GraphWithBackPressure extends App {
implicit val actorSysten = ActorSystem("Actory-System")
implicit val executionContext = actorSysten.dispatcher
def partition2(v : Int): Int = {
v % 2
}
val partialGraphDSL = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val partitioner = builder.add(Partition[Int](2, partition2))
val mergeLatest = builder.add(MergeLatest[Int](2))
partitioner.out(0) ~> mergeLatest
partitioner.out(1) ~> mergeLatest
FlowShape(partitioner.in, mergeLatest.out)
}
val source = Source(1 to 1000000)
.async
val mainGraph = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val partialGraph = builder.add(partialGraphDSL)
val delay = Flow[List[Int]]
.delay(1.seconds, OverflowStrategy.backpressure)
.map { x => println("After delay."); x }
.withAttributes(Attributes.inputBuffer(1,1))
val newDelay = Flow[Int]
.delay(1.seconds, OverflowStrategy.backpressure)
.map { x => println("After delay."); x }
.withAttributes(Attributes.inputBuffer(1,1))
source ~> newDelay ~> Flow[Int].map { x => println(x); x } ~> partialGraph ~> delay ~> Sink.foreach(println)
ClosedShape
}
val runnable = RunnableGraph.fromGraph(mainGraph)
val materialized = runnable.run()(ActorMaterializer())
}
如果我们在Flow之前开始延迟,那么我们可以看到背压。
NOTE:检查newDelay方法