使用 Flink 的 测试工具类 来测试我的有状态运算符,我想编写单元测试来验证存储在运算符状态中的数据是否是我所期望的。然而,我似乎无法做到这一点,只能调用
getOuput
来查看操作员输出了什么,以及 numKeyedStateEntries
来查看状态中有多少个值。有没有办法真正获取状态中的值?
没有(合理的)方法可以做到这一点。
假设您可以编写一个采用保存点的测试,然后使用状态处理器 API 来验证状态。
我可能会认为测试存储在状态中的值会使测试与实现过于紧密地结合在一起。验证结果是否正确以及状态是否保留时间过长就足够了。但我同意,在单元测试期间对状态后端有更多的了解会很好。
首先,我同意 David 关于检查状态如何与实现紧密耦合的评论。尽管有时这很有用,但如果您有复杂的设置和/或更新状态行为。
无论如何,我相信还有另一种(不合理的)方法可以做到这一点......
MyStateBackend
的 HashMapStateBackend
类。在本课程中,您将覆盖
createKeyedStateBackend
,并保存结果(它是 HeapKeyedStateBackend
)。
添加一个
getStates()
方法,通过调用保存的后端的 List<Tuple2<K, V>>
和 getKeys()
方法,返回 getOrCreateKeyedState()
(键控状态值)。
harness.setStateBackend(your custom state backend)
,然后再调用 harness.setup()
和 harness.open()
。您现在应该能够获取/检查状态。
您可以通过您的运营商访问状态如下。
harness.getOperator.getKeyedStateStore.getMapState
harness.getOperator.getOperatorStateBackend.getBroadcastState