一个简单的混合组合(下面)有时会在staartup上打印一条调试消息,说它因为零需求而丢弃消息。我希望混淆阶段能够提供无限的需求,所以上述情况绝不应该如此。我错过了什么?
val sourceRef = Source.actorRef[KeyedHighFreqEvent](0, OverflowStrategy.fail)
.conflateWithSeed(...into hash map...)
.throttle(8, per = 1.second, maxBurst=24, ThrottleMode.shaping)
.mapConcat(...back to individual KeyedHighFreqEvent...)
.groupedWithin(1024, 1.millisecond)
.to(Sink.actorRef(networkPublisher, Nil))
.run()
system.eventStream.subscribe(sourceRef, classOf[KeyedHighFreqEvent])
Source.actorRef
的文档很清楚:
可以使用0的
bufferSize
禁用缓冲区,如果下游没有需求,则会丢弃接收的消息。当bufferSize
为0时,overflowStrategy
无关紧要。在此Source之后添加异步边界;因此,假设下游始终产生需求是绝对不安全的。
问题是源和混淆阶段之间的异步边界。混淆阶段确实提供了无限的需求,但异步边界类型使得传播到源的速度变慢。
您可以在源中使用缓冲区(增加bufferSize),也可以使用其他源,例如Source.queue
,因为它不会引入异步边界