Akka流并行性

问题描述 投票:0回答:1

根据文档[1],我一直试图在Akka Stream中并行化流,但是由于某些原因,我没有得到预期的结果。

我遵循了文档中概述的步骤,并且我认为我没有错过任何东西。但是,我的流的计算都是依次进行的。

我在这里想念什么?

[1] https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html

import akka.actor.ActorSystem
import akka.stream.FlowShape
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, Source}

object ScalaParallell extends App {

  implicit val system = ActorSystem("QuickStart")

  def longRunningComputation(x: Int): Int = {
    println(s"Computing 1 ${x}")
    Thread.sleep(10000)
    println(s"Computation 1 ${x} done")
    x
  }
  def longRunningComputation2(x: Int): Int = {
    println(s"Computing 2 ${x}")
    Thread.sleep(10000)
    println(s"Computation 2 ${x} done")
    x
  }

  val processor: Flow[Int, Int, NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._

      // prepare graph elements
      val balance = b.add(Balance[Int](2))
      val merge = b.add(Merge[Int](2))
      val f = Flow[Int].map(longRunningComputation)
      val f2 = Flow[Int].map(longRunningComputation2)


      // connect the graph
      balance.out(0) ~> f.async ~> merge.in(0)
      balance.out(1) ~> f2.async ~> merge.in(1)

      // expose ports
      FlowShape(balance.in, merge.out)
    })


  // Wire it all up.
  val xs = List(1,2,3)
  val source: Source[Int, NotUsed] = Source(xs)
  source.via(processor).runForeach(println)


  Thread.sleep(5000)
}

示例输出

Computing 2 1
Computation 2 1 done
Computing 2 2
1
Computation 2 2 done
Computing 2 3
2
Computation 2 3 done
3

我希望看到两个计算同时发生。例如:

Computing 1 1
Computing 1 2
Computation 1 2 done
Computing 1 3
Computation 1 1 done
Computing 2 4
1
2
..
scala akka akka-stream
1个回答
0
投票

[尝试删除Thread.sleeplongRunningComputation内部的longRunningComputation2并将xs设置为更长的值,例如1 to 100,那么您将能够观察到并行处理。不知道为什么,但是阻塞Thread.sleep在akka中绝对被视为反模式

© www.soinside.com 2019 - 2024. All rights reserved.