为什么 Flink 在每次调用测试工具中的 processElement 后都会重置我的状态?

问题描述 投票:0回答:1

我正在使用 Flink 的 KeyedOneInputStreamOperatorTestHarness 并调用 processElement 两次。 processElement 将更新状态以计算所看到的元素数量。 在这种情况下,调用 processElement 两次后,我预计状态为 2。 第一个 processElement 将状态设置为 1,但是当我调用第二个 processElement 时,我看到状态重置为 null。您知道为什么会发生这种情况以及如何在调用 processElement 之间保留状态吗?

java testing apache-flink flink-streaming
1个回答
0
投票

由于您的示例表明状态似乎在第二次操作后被重置,这让我认为您的元素具有不同的键,并且每个元素都被分配了新的状态/上下文。

使用

KeyedOneInputStreamOperatorTestHarness
时,您需要确保传递给运算符的两个元素共享相同的键(因为这强制两个元素由同一个实例求值并且可以访问相同的状态).

考虑以下示例函数,该函数将简单地计算流经运算符的记录数并将其输出:

private class StatefulLengthCountFunction: KeyedProcessFunction<Int, String, Int>(){
    private lateinit var count: ValueState<Int>

    override fun open(parameters: Configuration) {
        count = runtimeContext.getState(ValueStateDescriptor("count", Int::class.java))
    }

    override fun processElement(value: String, ctx: Context, out: Collector<Int>) {
        var currentCount = count.value() ?: 0
        count.update(++currentCount)

        out.collect(currentCount)
    }
}

我们将使用以下测试来证明这一点:

@Test
fun `test demonstrating stateful counting based on string size`(){
    // Arrange
    val harness = ProcessFunctionTestHarnesses.forKeyedProcessFunction(
        StatefulLengthCountFunction(),
        // Note the size of the string will dictate the key
        { value -> value.length },
        TypeInformation.of(Int::class.java)
    )

    // Act (since all three elements share the same size, they'll go 
    // through the same operator and will share state
    harness.processElement(StreamRecord("abc", 0))
    harness.processElement(StreamRecord("def", 0))
    harness.processElement(StreamRecord("ghi", 0))

    // Assert
    val output = harness.extractOutputValues().last()
    assert(output == 3)
}
© www.soinside.com 2019 - 2024. All rights reserved.