这是问题的背景:
这是我能想到的最好的:
Source.tick(5.seconds, 3.seconds, 1)
.scan(0)((a, b) => a + b) // have a counter
.wireTap(num => logger.warn(s"up ${num.formatted("%02d")}"))
.buffer(1, OverflowStrategy.dropHead)
.throttle(1, 7.seconds)
.wireTap(num => logger.warn(s"down ${num.formatted("%02d")}"))
.runWith(Sink.ignore)(materializer)
这几乎按照我希望的方式工作:有一个节流阀,每 7 秒不会让超过一个项目,在节流阀之前有一个缓冲区,它将保留单个元素,并将其替换为新的元素一个到了。
但是,当我检查日志时,我可以看到该行为不是最优的:
up 01
down 01
up 02
up 03
down 02
up 04
up 05
up 06
down 03
up 07
up 08
down 06
up 09
up 10
down 08
节流阀不是从缓冲区获取最新的元素,而是使用最后一个节流元素被释放时缓冲区中的元素。 IE。它看起来不是暂停然后检查新元素,而是获取一个元素并等待它直到计时器完成。
有更好的方法吗?或者我应该实现自己的流程?
在
GraphStage
中实现您自己的,您可以精确控制何时以及如何推/拉元素。
这是一个例子
class LastElementWithin[A](duration: FiniteDuration) extends GraphStage[FlowShape[A, A]] {
private val in = Inlet[A]("LastElementWithin.in")
private val out = Outlet[A]("LastElementWithin.out")
override val shape: FlowShape[A, A] = FlowShape(in, out)
private sealed trait CallbackEvent
private case object Pull extends CallbackEvent
private case object Push extends CallbackEvent
private case object Flush extends CallbackEvent
private case object Finish extends CallbackEvent
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with StageLogging {
setHandlers(
in = in,
out = out,
handler = new AbstractInOutHandler {
override def onPush(): Unit = asyncCallback.invoke(Push)
override def onPull(): Unit = asyncCallback.invoke(Pull)
override def onUpstreamFinish(): Unit = asyncCallback.invoke(Finish)
}
)
private val FlushTimerKey = "Flush"
protected override def onTimer(timerKey: Any): Unit = {
if (timerKey == FlushTimerKey) {
asyncCallback.invoke(Flush)
}
}
private val asyncCallback = createAsyncCallback(new Procedure[CallbackEvent] {
private var last: Option[A] = None
override def apply(param: CallbackEvent): Unit = {
param match {
case Pull => onPull()
case Push => onPush()
case Finish => onFinish()
case Flush => flush()
}
}
private def onPull(): Unit = {
if (!isTimerActive(FlushTimerKey)) scheduleOnce(FlushTimerKey, duration)
if (!hasBeenPulled(in)) pull(in)
}
private def onPush(): Unit = {
last = Some(grab(in))
pull(in)
}
private def onFinish(): Unit = {
cancelTimer(FlushTimerKey)
last.foreach(emit(out, _))
completeStage()
}
private def flush(): Unit = {
if (isAvailable(out)) {
last.foreach(emit(out, _))
scheduleOnce(FlushTimerKey, duration)
}
}
})
}
}
按流程运行
implicit val as: ActorSystem = ActorSystem("test")
val done = Source
.tick(5.nanoseconds, 3.seconds, 1)
.scan(0)((a, b) => a + b)
.wireTap(num => println(s"up ${"%02d".format(num)}"))
.via(Flow.fromGraph(new LastElementWithin(7.seconds)))
.wireTap(num => println(s"down ${"%02d".format(num)}"))
.toMat(Sink.ignore)(Keep.right)
.run()
产品
up 00
up 01
up 02
up 03
down 03
up 04
up 05
down 05
up 06
up 07
down 07
up 08
up 09
up 10
down 10