在flink应用模式app中指定job id

问题描述 投票:0回答:1

Flink新手请教一个操作问题。

我有一个在应用程序模式下运行的简单 flink 应用程序:它从无界的 kafka 源消费,运行 2 个带有一些简单逻辑和状态的运算符,然后输出到 kafka 接收器。检查点配置为写入 S3 存储桶,并且我已将部署设置为仅作为简单的 kubernetes 服务运行(据我所知,每个应用程序实例都将运行 JobManager 和 TaskManager 逻辑)。这项工作是通过调用

main()
从普通的
StreamExecutionEnvironment.execute()
方法(即不通过 flink CLI)开始。

在我的机器上使用 S3 模拟器作为 sidecar 运行的本地集成测试中,我注意到检查点存储在像

s3://my-bucket/my-checkpoint-dir/<job-id>/...
这样的路径中,这是有意义的。由于我将把我的应用程序作为具有多个应用程序实例的 kubernetes 服务来运行,我想我将能够设置运行状况检查和监控的组合,以确保始终有一个 JobManager 来保持工作部署期间进行。

这是我的问题。我注意到,当我在开发过程中停止并启动本地计算机上的集群时,它最终会使用新的作业 ID 启动一个新作业,因此不会从任何先前存储的检查点恢复。在本地这很好,但我可以预见到我可能需要关闭生产中的应用程序实例并再次将它们重新启动(升级 flink、一些重大迁移、不可预见的重大问题等)。在这样的场景中,有没有一种方法可以编写应用程序代码,以便在备份实例时显式使用已知的稳定作业 ID? (理想情况下,我不想为 JobManager 额外依赖外部 Zookeeper 集群,但如果没有其他选择,我可以这样做。)

对此的一个答案是使用保存点而不是检查点来执行操作任务。有关详细信息,请参阅

升级应用程序和 Flink 版本
apache-flink flink-streaming
1个回答
0
投票

另一个需要考虑的资源是 Flink Kubernetes Operator,它被设置为使用保存点来处理作业生命周期管理。

© www.soinside.com 2019 - 2024. All rights reserved.