限制 Akka 流中的源滴答频率并始终对最新消息采取行动

问题描述 投票:0回答:1

这是问题的背景:

  1. 有一个源,它不断滴答,无法保证滴答频率
  2. 我们想要限制源的最大滴答率(例如,我们在数据库中启动消息,并且我们不希望存储频率超过每 7 秒一次)
  3. 我们只对最新事件感兴趣,因此如果在 5 秒等待时间内有新的事件发出,我们只对此感兴趣。

这是我能想到的最好的:

Source.tick(5.seconds, 3.seconds, 1)
    .scan(0)((a, b) => a + b) // have a counter
    .wireTap(num => logger.warn(s"up   ${num.formatted("%02d")}"))
    .buffer(1, OverflowStrategy.dropHead)
    .throttle(1, 7.seconds)
    .wireTap(num => logger.warn(s"down     ${num.formatted("%02d")}"))
    .runWith(Sink.ignore)(materializer)

这几乎按照我希望的方式工作:有一个节流阀,每 7 秒不会让超过一个项目,在节流阀之前有一个缓冲区,它将保留单个元素,并将其替换为新的元素一个到了。

但是,当我检查日志时,我可以看到该行为不是最优的:

up   01
down     01
up   02
up   03
down     02
up   04
up   05
up   06
down     03
up   07
up   08
down     06
up   09
up   10
down     08

节流阀不是从缓冲区获取最新的元素,而是使用最后一个节流元素被释放时缓冲区中的元素。 IE。它看起来不是暂停然后检查新元素,而是获取一个元素并等待它直到计时器完成。

有更好的方法吗?或者我应该实现自己的流程?

scala akka akka-stream
1个回答
0
投票

GraphStage
中实现您自己的,您可以精确控制何时以及如何推/拉元素。

这是一个例子

  class LastElementWithin[A](duration: FiniteDuration) extends GraphStage[FlowShape[A, A]] {

    private val in = Inlet[A]("LastElementWithin.in")
    private val out = Outlet[A]("LastElementWithin.out")

    override val shape: FlowShape[A, A] = FlowShape(in, out)

    private sealed trait CallbackEvent

    private case object Pull extends CallbackEvent

    private case object Push extends CallbackEvent

    private case object Flush extends CallbackEvent

    private case object Finish extends CallbackEvent

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
      new TimerGraphStageLogic(shape) with StageLogging {

        setHandlers(
          in = in,
          out = out,
          handler = new AbstractInOutHandler {

            override def onPush(): Unit = asyncCallback.invoke(Push)

            override def onPull(): Unit = asyncCallback.invoke(Pull)

            override def onUpstreamFinish(): Unit = asyncCallback.invoke(Finish)
          }
        )

        private val FlushTimerKey = "Flush"

        protected override def onTimer(timerKey: Any): Unit = {
          if (timerKey == FlushTimerKey) {
            asyncCallback.invoke(Flush)
          }
        }

        private val asyncCallback = createAsyncCallback(new Procedure[CallbackEvent] {

          private var last: Option[A] = None

          override def apply(param: CallbackEvent): Unit = {
            param match {
              case Pull   => onPull()
              case Push   => onPush()
              case Finish => onFinish()
              case Flush  => flush()
            }
          }

          private def onPull(): Unit = {
            if (!isTimerActive(FlushTimerKey)) scheduleOnce(FlushTimerKey, duration)
            if (!hasBeenPulled(in)) pull(in)
          }

          private def onPush(): Unit = {
            last = Some(grab(in))
            pull(in)
          }

          private def onFinish(): Unit = {
            cancelTimer(FlushTimerKey)
            last.foreach(emit(out, _))
            completeStage()
          }

          private def flush(): Unit = {
            if (isAvailable(out)) {
              last.foreach(emit(out, _))
              scheduleOnce(FlushTimerKey, duration)
            }
          }
        })
      }
  }

按流程运行

    implicit val as: ActorSystem = ActorSystem("test")
    val done = Source
      .tick(5.nanoseconds, 3.seconds, 1)
      .scan(0)((a, b) => a + b)
      .wireTap(num => println(s"up   ${"%02d".format(num)}"))
      .via(Flow.fromGraph(new LastElementWithin(7.seconds)))
      .wireTap(num => println(s"down ${"%02d".format(num)}"))
      .toMat(Sink.ignore)(Keep.right)
      .run()

产品

up   00
up   01
up   02
up   03
down 03
up   04
up   05
down 05
up   06
up   07
down 07
up   08
up   09
up   10
down 10
© www.soinside.com 2019 - 2024. All rights reserved.