我正在使用 Flink 的 KeyedOneInputStreamOperatorTestHarness 并调用 processElement 两次。 processElement 将更新状态以计算所看到的元素数量。 在这种情况下,调用 processElement 两次后,我预计状态为 2。 第一个 processElement 将状态设置为 1,但是当我调用第二个 processElement 时,我看到状态重置为 null。您知道为什么会发生这种情况以及如何在调用 processElement 之间保留状态吗?
由于您的示例表明状态似乎在第二次操作后被重置,这让我认为您的元素具有不同的键,并且每个元素都被分配了新的状态/上下文。
使用
KeyedOneInputStreamOperatorTestHarness
时,您需要确保传递给运算符的两个元素共享相同的键(因为这强制两个元素由同一个实例求值并且可以访问相同的状态).
考虑以下示例函数,该函数将简单地计算流经运算符的记录数并将其输出:
private class StatefulLengthCountFunction: KeyedProcessFunction<Int, String, Int>(){
private lateinit var count: ValueState<Int>
override fun open(parameters: Configuration) {
count = runtimeContext.getState(ValueStateDescriptor("count", Int::class.java))
}
override fun processElement(value: String, ctx: Context, out: Collector<Int>) {
var currentCount = count.value() ?: 0
count.update(++currentCount)
out.collect(currentCount)
}
}
我们将使用以下测试来证明这一点:
@Test
fun `test demonstrating stateful counting based on string size`(){
// Arrange
val harness = ProcessFunctionTestHarnesses.forKeyedProcessFunction(
StatefulLengthCountFunction(),
// Note the size of the string will dictate the key
{ value -> value.length },
TypeInformation.of(Int::class.java)
)
// Act (since all three elements share the same size, they'll go
// through the same operator and will share state
harness.processElement(StreamRecord("abc", 0))
harness.processElement(StreamRecord("def", 0))
harness.processElement(StreamRecord("ghi", 0))
// Assert
val output = harness.extractOutputValues().last()
assert(output == 3)
}