我试图在Apache Beam(通过Scio)中使用一个有状态的DoFn来聚合(按键)一个流数据源(使用 @ProcessElement
与 @StateId
ValueState
元素)。) 我想这是最适合我想解决的问题的。要求是
state.clear()
)基于我所控制的某些条件考虑到这是一个流媒体管道,并且将无限期地运行,使用一个 combinePerKey
在一个全局窗口上,累积烧毁的窗格,似乎会继续增加它的内存占用,以及随着时间的推移需要运行的数据量,所以我想避免这样做。此外,当测试这个问题时,(也许正如预期的那样),它只是将新计算的汇总与历史输入一起追加到输出中,而不是对每个键使用最新的值。
我的想法是,使用StatefulDoFn可以简单地让我输出到现在为止的所有全局状态(),但似乎这不是一个简单的解决方案。我已经看到了使用计时器来人为地执行回调的提示,以及可能使用缓慢增长的侧输入映射(当我创建PCollectionView<Map<String,String>>时,如何解决Duplicate values异常。),并以某种方式刷新这个异常,但这本质上需要遍历地图中的所有值,而不是加入它。
我觉得我可能忽略了一些简单的东西来实现这个工作。我对Beam中的窗口和定时器的许多概念都比较陌生,希望得到任何关于如何解决这个问题的建议。谢谢!我正在尝试汇总(每个窗口)和定时器。
你是对的,有状态的DoFn应该能帮助你。这是你可以做的一个基本草图。请注意,这只是输出不含键的和。这可能不是你想要的,但它应该能帮助你前进。
class CombiningEmittingFn extends DoFn<KV<Integer, Integer>, Integer> {
@TimerId("emitter")
private final TimerSpec emitterSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
@StateId("done")
private final StateSpec<ValueState<Boolean>> doneState = StateSpecs.value();
@StateId("agg")
private final StateSpec<CombiningState<Integer, int[], Integer>>
aggSpec = StateSpecs.combining(
Sum.ofIntegers().getAccumulatorCoder(null, VarIntCoder.of()), Sum.ofIntegers());
@ProcessElement
public void processElement(ProcessContext c,
@StateId("agg") CombiningState<Integer, int[], Integer> aggState,
@StateId("done") ValueState<Boolean> doneState,
@TimerId("emitter") Timer emitterTimer) throws Exception {
if (SOME CONDITION) {
countValueState.clear();
doneState.write(true);
} else {
countValueState.addAccum(c.element().getValue());
emitterTimer.align(Duration.standardMinutes(5)).setRelative();
}
}
}
@OnTimer("emitter")
public void onEmit(
OnTimerContext context,
@StateId("agg") CombiningState<Integer, int[], Integer> aggState,
@StateId("done") ValueState<Boolean> doneState,
@TimerId("emitter") Timer emitterTimer) {
Boolean isDone = doneState.read();
if (isDone != null && isDone) {
return;
} else {
context.output(aggState.getAccum());
// Set the timer to emit again
emitterTimer.align(Duration.standardMinutes(5)).setRelative();
}
}
}
}
很高兴能和你一起迭代出一些能用的东西。
@Pablo确实是正确的,StatefulDoFn和定时器在这种情况下是有用的。这是我能够让工作的代码。
有状态的Do Fn
// DomainState is a custom case class I'm using
type DoFnT = DoFn[KV[String, DomainState], KV[String, DomainState]]
class StatefulDoFn extends DoFnT {
@StateId("key")
private val keySpec = StateSpecs.value[String]()
@StateId("domainState")
private val domainStateSpec = StateSpecs.value[DomainState]()
@TimerId("loopingTimer")
private val loopingTimer: TimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME)
@ProcessElement
def process(
context: DoFnT#ProcessContext,
@StateId("key") stateKey: ValueState[String],
@StateId("domainState") stateValue: ValueState[DomainState],
@TimerId("loopingTimer") loopingTimer: Timer): Unit = {
... logic to create key/value from potentially null values
if (keepState(value)) {
loopingTimer.align(Duration.standardMinutes(5)).setRelative()
stateKey.write(key)
stateValue.write(value)
if (flushState(value)) {
context.output(KV.of(key, value))
}
} else {
stateValue.clear()
}
}
@OnTimer("loopingTimer")
def onLoopingTimer(
context: DoFnT#OnTimerContext,
@StateId("key") stateKey: ValueState[String],
@StateId("domainState") stateValue: ValueState[DomainState],
@TimerId("loopingTimer") loopingTimer: Timer): Unit = {
... logic to create key/value checking for nulls
if (keepState(value)) {
loopingTimer.align(Duration.standardMinutes(5)).setRelative()
if (flushState(value)) {
context.output(KV.of(key, value))
}
}
}
}
有管道
sc
.pubsubSubscription(...)
.keyBy(...)
.withGlobalWindow()
.applyPerKeyDoFn(new StatefulDoFn())
.withFixedWindows(
duration = Duration.standardMinutes(5),
options = WindowOptions(
accumulationMode = DISCARDING_FIRED_PANES,
trigger = AfterWatermark.pastEndOfWindow(),
allowedLateness = Duration.ZERO,
// Only take the latest per key during a window
timestampCombiner = TimestampCombiner.END_OF_WINDOW
))
.reduceByKey(mostRecentEvent())
.saveAsCustomOutput(TextIO.write()...)