Apache Beam有状态的DoFn定期输出所有的KV对。

问题描述 投票:0回答:1

我试图在Apache Beam(通过Scio)中使用一个有状态的DoFn来聚合(按键)一个流数据源(使用 @ProcessElement@StateId ValueState 元素)。) 我想这是最适合我想解决的问题的。要求是

  • 对于一个给定的键,记录要在所有时间内进行汇总(本质上是总和) -- 我不关心以前计算过的合计数,只关心最近的合计数
  • 钥匙可 被驱逐 从国家(state.clear())基于我所控制的某些条件
  • 每5分钟一次 无论是否有新钥匙出现,所有钥匙 未被逐出国门者 应运而生

考虑到这是一个流媒体管道,并且将无限期地运行,使用一个 combinePerKey 在一个全局窗口上,累积烧毁的窗格,似乎会继续增加它的内存占用,以及随着时间的推移需要运行的数据量,所以我想避免这样做。此外,当测试这个问题时,(也许正如预期的那样),它只是将新计算的汇总与历史输入一起追加到输出中,而不是对每个键使用最新的值。

我的想法是,使用StatefulDoFn可以简单地让我输出到现在为止的所有全局状态(),但似乎这不是一个简单的解决方案。我已经看到了使用计时器来人为地执行回调的提示,以及可能使用缓慢增长的侧输入映射(当我创建PCollectionView<Map<String,String>&gt时,如何解决Duplicate values异常。),并以某种方式刷新这个异常,但这本质上需要遍历地图中的所有值,而不是加入它。

我觉得我可能忽略了一些简单的东西来实现这个工作。我对Beam中的窗口和定时器的许多概念都比较陌生,希望得到任何关于如何解决这个问题的建议。谢谢!我正在尝试汇总(每个窗口)和定时器。

google-cloud-dataflow apache-beam spotify-scio
1个回答
1
投票

你是对的,有状态的DoFn应该能帮助你。这是你可以做的一个基本草图。请注意,这只是输出不含键的和。这可能不是你想要的,但它应该能帮助你前进。

class CombiningEmittingFn extends DoFn<KV<Integer, Integer>, Integer> {

  @TimerId("emitter")
  private final TimerSpec emitterSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

  @StateId("done")
  private final StateSpec<ValueState<Boolean>> doneState = StateSpecs.value();

  @StateId("agg")
  private final StateSpec<CombiningState<Integer, int[], Integer>>
      aggSpec = StateSpecs.combining(
          Sum.ofIntegers().getAccumulatorCoder(null, VarIntCoder.of()), Sum.ofIntegers());

  @ProcessElement
  public void processElement(ProcessContext c,
      @StateId("agg") CombiningState<Integer, int[], Integer> aggState,
      @StateId("done") ValueState<Boolean> doneState,
      @TimerId("emitter") Timer emitterTimer) throws Exception {
        if (SOME CONDITION) {
          countValueState.clear();
          doneState.write(true);
        } else {
          countValueState.addAccum(c.element().getValue());
          emitterTimer.align(Duration.standardMinutes(5)).setRelative();
        }
      }
    }

  @OnTimer("emitter")
  public void onEmit(
      OnTimerContext context,
      @StateId("agg") CombiningState<Integer, int[], Integer> aggState,
      @StateId("done") ValueState<Boolean> doneState,
      @TimerId("emitter") Timer emitterTimer) {
      Boolean isDone = doneState.read();
      if (isDone != null && isDone) {
        return;
      } else {
        context.output(aggState.getAccum());
        // Set the timer to emit again
        emitterTimer.align(Duration.standardMinutes(5)).setRelative();
      }
    }
  }
  }

很高兴能和你一起迭代出一些能用的东西。


0
投票

@Pablo确实是正确的,StatefulDoFn和定时器在这种情况下是有用的。这是我能够让工作的代码。

有状态的Do Fn

// DomainState is a custom case class I'm using
type DoFnT = DoFn[KV[String, DomainState], KV[String, DomainState]]

class StatefulDoFn extends DoFnT {

  @StateId("key")
  private val keySpec = StateSpecs.value[String]()

  @StateId("domainState")
  private val domainStateSpec = StateSpecs.value[DomainState]()

  @TimerId("loopingTimer")
  private val loopingTimer: TimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME)

  @ProcessElement
  def process(
      context: DoFnT#ProcessContext,
      @StateId("key") stateKey: ValueState[String],
      @StateId("domainState") stateValue: ValueState[DomainState],
      @TimerId("loopingTimer") loopingTimer: Timer): Unit = {

    ... logic to create key/value from potentially null values

    if (keepState(value)) {
      loopingTimer.align(Duration.standardMinutes(5)).setRelative()

      stateKey.write(key)
      stateValue.write(value)

      if (flushState(value)) {
        context.output(KV.of(key, value))
      }
    } else {
      stateValue.clear()
    }
  }

  @OnTimer("loopingTimer")
  def onLoopingTimer(
      context: DoFnT#OnTimerContext,
      @StateId("key") stateKey: ValueState[String],
      @StateId("domainState") stateValue: ValueState[DomainState],
      @TimerId("loopingTimer") loopingTimer: Timer): Unit = {

    ... logic to create key/value checking for nulls

    if (keepState(value)) {

      loopingTimer.align(Duration.standardMinutes(5)).setRelative()

      if (flushState(value)) {
        context.output(KV.of(key, value))
      }
    }
  }
}

有管道

sc
  .pubsubSubscription(...)
  .keyBy(...)
  .withGlobalWindow()
  .applyPerKeyDoFn(new StatefulDoFn())
  .withFixedWindows(
    duration = Duration.standardMinutes(5),
    options = WindowOptions(
      accumulationMode = DISCARDING_FIRED_PANES,
      trigger = AfterWatermark.pastEndOfWindow(),
      allowedLateness = Duration.ZERO,
      // Only take the latest per key during a window
      timestampCombiner = TimestampCombiner.END_OF_WINDOW
    ))
  .reduceByKey(mostRecentEvent())
  .saveAsCustomOutput(TextIO.write()...)
© www.soinside.com 2019 - 2024. All rights reserved.