我有一个 AWS 托管 Apache Flink 应用程序。
我们有 2 个输入数据流和 2 个输出数据流。
每次我进行较大的更改(例如,我们更改输入和输出流及其 Java 对象上的数据格式)时,重新部署都无法从最新的检查点恢复处理,我们需要在没有检查点的情况下重新启动应用程序。
我们收到这样的错误:
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint s3://90adba2e0a9002371abdf5ee58da7883e350fe4d/a4aa7fa6aabe9853f35082abc0fd1320-338785721659-1663851015791/savepoints/115/attempt-1/savepoint-a4aa7f-7f08cc16a519. Cannot map checkpoint/savepoint state for operator fb8f6c016d8c769a70a72e4189372e48 to the new program, because the operator is not available in the new program.
我的问题是:我们如何解决这个问题?我们确实希望避免完全重新启动,因为并非所有输入流都受到此更改的影响,而那些未受影响的输入流过去发送了我们希望恢复的关键数据。有没有办法将旧的检查点映射到新的状态,作为迁移?或者至少拯救未受影响的部分州?
用一个例子来说明我的意思:这里,假设我们有两个输入流(流“A”和流“B”),并且有两个输出流(称为输出“C”和输出“D”)。我修改了 Flink 代码,并更改了流 A 和输出 C 的数据格式 - 但我们没有更改流“B”和输出“D”。由于重大变化,我们失去了整个状态。有没有办法从旧的检查点映射或至少保存“流“B” - > proc 4 - > proc 5 - >输出“D”路径的状态?
我确实调查了这个问题,发现解决方案是 State Processor API,但我没有找到任何参考或使用指南如何将它与 AWS 托管的 Flink 一起使用。
通过查看错误,您没有指定算子的 ID,算子的当前 ID 是由 flink 生成的,即
fb8f6c016d8c769a70a72e4189372e48
。
我建议你明确地给出一个 ID,而不是让 flink 生成它,就像你现在的情况一样。
这里有一份官方文档供您参考: