我试图了解我应该如何在Akka流中使用Source.queue和Sink.queue。在下面编写的小测试程序中,我发现我能够成功向Source.queue提供1000个项目。但是,当我等待未来时,应该会得到将所有这些项目从队列中拉出的结果,未来永远不会完成。具体来说,我们应该在最后看到消息“打印我们从队列中取出的内容”永远不会打印出来-而是看到错误“ TimeoutException:期货在[10秒]后超时”
任何指导,不胜感激!
import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Attributes}
import org.scalatest.FunSuite
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
class StreamSpec extends FunSuite {
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val log: LoggingAdapter = Logging(actorSystem.eventStream, "basis-test")
implicit val ec: ExecutionContext = actorSystem.dispatcher
case class Req(name: String)
case class Response(
httpVersion: String = "",
method: String = "",
url: String = "",
headers: Map[String, String] = Map())
test("put items on queue then take them off") {
val source = Source.queue[String](128, akka.stream.OverflowStrategy.backpressure)
val flow = Flow[String].map(element => s"Modified $element")
val sink = Sink.queue[String]().withAttributes( Attributes.inputBuffer(128, 128))
val (sourceQueue, sinkQueue) = source.via(flow).toMat(sink)(Keep.both).run()
(1 to 1000).map( i =>
Future {
println("offerd" + i) // I see this print 1000 times as expected
sourceQueue.offer(s"batch-$i")
}
)
println("DONE OFFER FUTURE FIRING")
// Now use the Sink.queue to pull the items we added onto the Source.queue
val seqOfFutures: immutable.Seq[Future[Option[String]]] =
(1 to 1000).map{ i => sinkQueue.pull() }
val futureOfSeq: Future[immutable.Seq[Option[String]]] =
Future.sequence(seqOfFutures)
val seq: immutable.Seq[Option[String]] =
Await.result(futureOfSeq, 10.second)
// unfortunately our future times out here
println("print what we pulled off the queue:" + seq);
}
}
再次看这个,我意识到我最初设置并提出了错误的问题。
伴随我原始问题的测试掀起了一波浪潮1000个期货中的每个,它们每个都尝试提供1个项目到队列中。然后,该测试的第二步尝试创建一个包含1000个元素的序列(seqOfFutures)每个未来都试图从队列中提取值的地方。关于为什么我会收到超时错误的理论是,由于运行而导致某种死锁线程不足或由于一个线程在等待另一个线程,但是在线程上等待被阻塞的地方,或类似的东西。
由于我已更正了这一点,因此我不希望找出确切的原因以下代码中的所有内容(请参见更正的代码)。
在新代码中,使用队列的测试称为:“将项目放入队列中,然后将它们从异步并行机制中删除-(3)”。
在此测试中,我有一组10个任务并行运行以完成“入队”操作。然后我还有另外10个任务进行出队操作,这不仅涉及该项目不在列表中,还调用stringModifyFunc,它引入了1毫秒的处理延迟。
我还想证明我从中获得了一些性能上的好处并行启动任务并使任务步骤通过将结果传递给队列,因此测试3作为定时操作运行,我发现它需要1.9秒。
测试(1)和(2)做的工作量相同,但是顺序进行-第一个没有介入队列,第二个使用队列在步骤之间传递结果。这些测试分别在13.6和15.6秒内运行(这表明队列增加了一些开销,但是这被并行运行任务的效率所掩盖。)
正确的代码
import akka.{Done, NotUsed}
import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Attributes, QueueOfferResult}
import org.scalatest.FunSuite
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
class Speco extends FunSuite {
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val log: LoggingAdapter = Logging(actorSystem.eventStream, "basis-test")
implicit val ec: ExecutionContext = actorSystem.dispatcher
val stringModifyFunc: String => String = element => {
Thread.sleep(1)
s"Modified $element"
}
def setup = {
val source = Source.queue[String](128, akka.stream.OverflowStrategy.backpressure)
val sink = Sink.queue[String]().withAttributes(Attributes.inputBuffer(128, 128))
val (sourceQueue, sinkQueue) = source.toMat(sink)(Keep.both).run()
val offers: Source[String, NotUsed] = Source(
(1 to iterations).map { i =>
s"item-$i"
}
)
(sourceQueue,sinkQueue,offers)
}
val outer = 10
val inner = 1000
val iterations = outer * inner
def timedOperation[T](block : => T) = {
val t0 = System.nanoTime()
val result: T = block // call-by-name
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) / (1000 * 1000) + " milliseconds")
result
}
test("20k iterations in single threaded loop no queue (1)") {
timedOperation{
(1 to iterations).foreach { i =>
val str = stringModifyFunc(s"tag-${i.toString}")
System.out.println("str:" + str);
}
}
}
test("20k iterations in single threaded loop with queue (2)") {
timedOperation{
val (sourceQueue, sinkQueue, offers) = setup
val resultFuture: Future[Done] = offers.runForeach{ str =>
val itemFuture = for {
_ <- sourceQueue.offer(str)
item <- sinkQueue.pull()
} yield (stringModifyFunc(item.getOrElse("failed")) )
val item = Await.result(itemFuture, 10.second)
System.out.println("item:" + item);
}
val result = Await.result(resultFuture, 20.second)
System.out.println("result:" + result);
}
}
test("put items on queue then take them off (with async parallelism) - (3)") {
timedOperation{
val (sourceQueue, sinkQueue, offers) = setup
def enqueue(str: String) = sourceQueue.offer(str)
def dequeue = {
sinkQueue.pull().map{
maybeStr =>
val str = stringModifyFunc( maybeStr.getOrElse("failed2"))
println(s"dequeud value is $str")
}
}
val offerResults: Source[QueueOfferResult, NotUsed] =
offers.mapAsyncUnordered(10){ string => enqueue(string)}
val dequeueResults: Source[Unit, NotUsed] = offerResults.mapAsyncUnordered(10){ _ => dequeue }
val runAll: Future[Done] = dequeueResults.runForeach(u => u)
Await.result(runAll, 20.second)
}
}
}