在流媒体停滞时,跳过一部分流媒体

问题描述 投票:2回答:1

我有一种情况,消息通过一个可能延迟消息的组件。

在压力下,我想跳过这个组件,这样就不会有超过X个消息同时被延迟。溢出的消息将跳过这个阶段,进入下一个阶段的蒸汽。

消息在这个阶段会被停滞,直到它们的未来完成,或者最多一分钟,以先到者为准。

我也许可以实现一个类似的自定义GraphStage。到这个缓冲区的例子或者使用 divertTo 和一些计数器来使消息跳过停滞的组件,但感觉在 akka 流中可能有更简单的方法

akka-stream
1个回答
0
投票

我一直在研究你的用例,并想出了一个基于以下方面的解决方案 Akka Actor 代表一个计数器和一个异步映射阶段。

我们的想法是: 3 元素,并且基于一个计数器,其最大容量为 2 顶多 2 的这些元素同时被慢速组件处理。

这样一来,总有一个处理线程保留给上游元素,这些元素会从慢速组件中分支出来,直接到达下游。


我们先定义一个基本的 Counter 最大容量为 Akka Actor:

class Counter(max: Int) extends Actor {
  private var count: Int = 0

  override def receive: Receive = {
    case TryAndLock if count < max =>
      count += 1
      sender ! true
    case TryAndLock =>
      sender ! false
    case Release =>
      count -= 1
  }
}

sealed trait CounterAction
case object TryAndLock extends CounterAction
case object Release extends CounterAction

val counter = system.actorOf(Props(new Counter(max = 2)))

它拥有一个可变更性 count 变量,可以通过 TryAndLock 请求,但只有在计数还没有达到最大容量的情况下,才可以通过一个 Release 请求。

我们正在使用一个 Actor 这样,从下面的并发锁定和释放操作就可以了。mapAsync 阶段都能正确处理,没有比赛条件。


那么,就只需要用一个 mapAsyncUnordered 阶段,其并行度仅比计数器的最大容量高1个单位。

任何经过异步阶段的元素都会查询到的 Counter 来尝试锁定一个资源。如果一个资源已经被锁定,那么这个元素将被扔到慢速组件中。如果没有,它将跳过它。元素被传递到慢速组件中,直到我们达到计数器的最大容量,这时任何新的元素都会被跳过,直到一个元素退出慢速组件并从计数器中释放资源。

我们不能简单地使用 mapAsync 因为元素在存在阶段时,会保持其上游的顺序,使跳过的元素等待慢速组件处理的元素完成后再在下游生产。因此,有必要使用 mapAsyncUnordered 来代替。

让我们定义一个例子,慢速组件最多同时处理2个元素,异步映射的并行度为3。

Source(0 to 15)
  .throttle(1, 50.milliseconds)
  .mapAsyncUnordered(parallelism = 3) { i =>
    (counter ? TryAndLock).map {
      case locked: Boolean if locked =>
        val result = slowTask(i)
        counter ! Release
        result
      case _ =>
        skipTask(i)
    }
  }
  .runForeach(println)

例如,这两个函数将模拟慢速组件(slowTask)和跳过慢速部分时该怎么做(skipTask):

def slowTask(value: Int): String = {
  val start = Instant.now()
  Thread.sleep(250)
  s"$value - processed - $start - ${Instant.now()}"
}
def skipTask(value: Int): String =
  s"$value - skipped - ${Instant.now()}"

其结果是这样的:

2 - skipped - 2020-06-03T19:07:19.410Z
3 - skipped - 2020-06-03T19:07:19.468Z
4 - skipped - 2020-06-03T19:07:19.518Z
5 - skipped - 2020-06-03T19:07:19.569Z
1 - processed - 2020-06-03T19:07:19.356Z - 2020-06-03T19:07:19.611Z
0 - processed - 2020-06-03T19:07:19.356Z - 2020-06-03T19:07:19.611Z
8 - skipped - 2020-06-03T19:07:19.719Z
9 - skipped - 2020-06-03T19:07:19.769Z
10 - skipped - 2020-06-03T19:07:19.819Z
6 - processed - 2020-06-03T19:07:19.618Z - 2020-06-03T19:07:19.869Z
12 - skipped - 2020-06-03T19:07:19.919Z
7 - processed - 2020-06-03T19:07:19.669Z - 2020-06-03T19:07:19.921Z
14 - skipped - 2020-06-03T19:07:20.019Z
15 - skipped - 2020-06-03T19:07:20.070Z
11 - processed - 2020-06-03T19:07:19.869Z - 2020-06-03T19:07:20.122Z
13 - processed - 2020-06-03T19:07:19.968Z - 2020-06-03T19:07:20.219Z

其中第一部分是上游元素的索引,第二部分是该元素被应用的变换(可以是 processed 当进入慢速部分或 skipped),最后一部分是一个时间戳,这样我们就可以直观地看到事情发生的时间。

进入阶段的2个首发元素(0和1)由慢速组件处理,一堆后续元素(2、3、4和5)跳过慢速阶段,直到这2个首发元素完成,其他元素才能进入慢速阶段。以此类推。

Gilfoyle

© www.soinside.com 2019 - 2024. All rights reserved.