在 apache flink 中,我们应该更新每个收集还是每个输入的状态?

问题描述 投票:0回答:1

想象一个案例,输入是一个文件名,我们想使用 flink

RichFlatMapFunction
更新文件的状态和输出行(每个文件包含 10k 行)。我想知道我应该在哪里更新状态以确保恰好一次交付。这里有 2 个解决方案:

// solution 1
class MyOp extends RichFlatMapFunction {
 ...
 def flatMap(filename: String, out: Collector[String]): Unit = {
    val state = Option(flinkState.value()).getOrElse(defaultState)
    for (line <- read(filename)) {
      state.update(line)
      flinkState.update(state)
      out.collect(line)
    }
 }
}
// solution 2
class MyOp extends RichFlatMapFunction {
 ...
 def flatMap(filename: String, out: Collector[String]): Unit = {
    val state = Option(flinkState.value()).getOrElse(defaultState)
    for (line <- read(filename)) {
      flinkState.update(state)
      out.collect(line)
    }
    state.update(line)
 }
}
apache-flink flink-streaming
1个回答
0
投票

就正确性而言,它没有任何区别。检查点永远不会在调用用户函数(如您的

RichFlatMapFunction
)期间发生,因此检查点将反映处理传递给
flatMap
方法的事件之前或之后的状态。

就性能而言,解决方案2要好得多。

© www.soinside.com 2019 - 2024. All rights reserved.