Akka stream actor-conflation-ratelimit-actor丢弃了前几条消息(有时)

问题描述 投票:1回答:1

一个简单的混合组合(下面)有时会在staartup上打印一条调试消息,说它因为零需求而丢弃消息。我希望混淆阶段能够提供无限的需求,所以上述情况绝不应该如此。我错过了什么?

val sourceRef = Source.actorRef[KeyedHighFreqEvent](0, OverflowStrategy.fail)
.conflateWithSeed(...into hash map...)
.throttle(8, per = 1.second, maxBurst=24, ThrottleMode.shaping)
.mapConcat(...back to individual KeyedHighFreqEvent...)
.groupedWithin(1024, 1.millisecond)
.to(Sink.actorRef(networkPublisher, Nil))
.run()

system.eventStream.subscribe(sourceRef, classOf[KeyedHighFreqEvent])
scala akka akka-stream reactive-streams backpressure
1个回答
1
投票

Source.actorRef的文档很清楚:

可以使用0的bufferSize禁用缓冲区,如果下游没有需求,则会丢弃接收的消息。当bufferSize为0时,overflowStrategy无关紧要。在此Source之后添加异步边界;因此,假设下游始终产生需求是绝对不安全的。

问题是源和混淆阶段之间的异步边界。混淆阶段确实提供了无限的需求,但异步边界类型使得传播到源的速度变慢。

您可以在源中使用缓冲区(增加bufferSize),也可以使用其他源,例如Source.queue,因为它不会引入异步边界

© www.soinside.com 2019 - 2024. All rights reserved.