我目前正在开发一个使用 flink 的 keyed state 的操作符(sink)。状态后端是基于堆的。状态 ttl 设置为 24 小时。运算符用例是这样的:首先我们捕获请求并将某些内容存储在 valueState 中,然后我们捕获响应并对请求和响应执行一些逻辑。
我担心的是,状态在生产环境中的增长速度可能比我预期的要快得多。所以我考虑添加一些 featureFlag 属性,以便在出现问题时能够快速禁用运算符。
据我了解,flink 似乎不喜欢从检查点到检查点从作业图中添加和删除有状态运算符的情况,因此我修改了我的运算符,如果属性为 true - 运算符按预期工作,如果为 false - 它将立即返回并跳过它的功能。
这应该可行,但如果我决定禁用 featureFlag,我不知道如何清理已经创建的状态。
由于状态的键控性质,我不能只调用 state.clear(),因为没有键上下文,而且我不想在超大状态下等待 24 小时。我考虑过如果 featureFlag 为 false,则将 ttlConfig 更改为 1 分钟而不是 24 小时(例如),但我检查了源代码并意识到,如果 stateBackend 中已存在描述符状态,则会返回它,并且永远不会使用新的 ttl 配置.
那么
有什么方法可以通过 stateDescriptor 强制清理状态吗?或者我只能等到 ttl 清理状态?
附注问题似乎与如何清理非活动键的 Flink 流状态?
但我不能使用相同的方法,因为
您没有提到的一个选项如下:不要将键控状态保持在
KeyedProcessFunction
中,而是使用 KeyedBroadcastProcessFunction
。然后,当您想要清除某些状态时,将事件广播到该运算符中。为了响应传入的广播事件, processBroadcastElement
方法可以在传入的 applyToKeyedState
对象上调用 KeyedBroadcastProcessFunction.Context
。此 applyToKeyedState
方法可以迭代所有键的键控状态,并清除以下状态:它想要清理的任何键。
如果您可以容忍一些停机时间,另一种解决方案是使用保存点停止作业,使用状态处理器 API 清除某些键的状态,然后从修改的保存点重新启动。