如何从代码中的保存点重新启动 flink

问题描述 投票:0回答:4

我有一个java类正在向flink集群提交sql文件。

我有

StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();


streamExecutionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
            streamExecutionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
            streamExecutionEnvironment.getCheckpointConfig().setExternalizedCheckpointCleanup(
                    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

streamExecutionEnvironment.enableCheckpointing(5000, CheckpointingMode.AT_LEAST_ONCE);
        streamExecutionEnvironment.getCheckpointConfig().setCheckpointStorage(customParams.get("checkpoint_path"));

Configuration config = new Configuration();
config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
config.set(PipelineOptions.NAME, customParams.get("pipeline_name"));

if (restartFromSavepointPath != null) {
    config.set(SAVEPOINT_PATH, restartFromSavepointPath);
}

streamExecutionEnvironment.setStateBackend(new EmbeddedRocksDBStateBackend(true));
streamExecutionEnvironment.configure(config);
...
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamExecutionEnvironment);

tableEnv.executeSql("create table ....");

// this is end of the main class

为了获取

restartFromSavepointPath
,我有一些代码可以获取最新的检查点位置,我可以看到该值为
file:///tmp/flink-checkpoint-directory-domain/a98c68e3139041bc32e6a931e1f701e1/chk-24/_metadata

当我将上面的代码打包为 fat jar 并运行它时,作业不会从上面的检查点开始。 启动命令是

flink run -c com.some.Deployer /some/local/location/some.jar
--> 我如何让它从保存点启动(假设
execution.savepoint.path
是通过 config.set(SAVEPOINT_PATH, restartFromSavepointPath); 设置的)?

但是如果我使用

-s
选项
flink run -c com.some.Deployer -s file:///tmp/flink-checkpoint-directory-domain/a98c68e3139041bc32e6a931e1f701e1/chk-24/_metadata /some/local/location/some.jar
--> 这个将从保存点开始工作。

apache-flink flink-streaming flink-sql
4个回答
0
投票

您可以将 flink-conf.yaml 中的“execution.savepoint.path”设置为最新的路径,这样就可以了。您还可以将“execution.savepoint.ignore-unclaimed-state”设置为“false”,这样如果没有从指定的保存点恢复,它将不会启动,如果设置为true,它将尝试从保存点恢复,如果失败则正常启动。 如果您使用的是 flink kubernetes Operator,您可以在作业规范的 FlinkDeployment yaml 中设置“initialSavepointPath”,如下所示:

  job:
jarURI: {{ .Values.jarName }} 
parallelism: {{ .Values.parallelism }}
entryClass: {{ .Values.entryClass }}
initialSavepointPath: {{ .Values.restorePath }}

您可以将values.restorePath替换为您的保存点位置。

我可以知道你如何以编程方式获取最新的保存点路径吗


0
投票

我最终使用了

config.set(SAVEPOINT_PATH, checkpointPath);
,其中
config
是配置,
checkpointPath
是 aws s3 中最新
_metadata
文件的路径


0
投票

您想要做的是通过远程环境从保存点运行 flink 作业。

这实际上是可能的,即:

SavepointRestoreSettings restoreSettings = SavepointRestoreSettings.forPath("/savepoints/savepoint-1234", false, RestoreMode.NO_CLAIM);
env = new RemoteStreamEnvironment(remoteHost, remotePort, Configuration.fromMap(Map.of()), new String[] {}, null, restoreSettings);
StreamTableEnvironment.create(env, settings);
...
tableEnv.executeSql("create table ....")

但是您需要了解远程环境方法和 CLI/Rest API 方法之间的区别。 使用 CLI 和 Rest API 方法,您可以从远程提交带有保存点路径的作业,即

./bin/flink run -s <savepointPath> ...
意味着
<savepointPath>
是作业在执行(构建作业图)期间尝试读取的路径。远程 Flink 集群。

远程环境方法有所不同,因为您实际上是将已经构建的作业图提交到远程集群,因此读取保存点文件发生在客户端上。

实际上,这意味着远程环境方法要求您的设置提供对远程 Flink 集群和向集群提交作业的本地客户端的保存点的共享访问。就像使用共享的预定义 s3 文件夹应该可以工作,即

flink 集群配置(flink-conf.yaml):

state.savepoints.dir: s3://savepoints/
state.checkpoints.dir: s3://checkpoints/

本地客户:

SavepointRestoreSettings restoreSettings = SavepointRestoreSettings.forPath("s3://savepoints/savepoint-1234", false, RestoreMode.NO_CLAIM);
env = new RemoteStreamEnvironment(remoteHost, remotePort, Configuration.fromMap(Map.of()), new String[] {}, null, restoreSettings);
StreamTableEnvironment.create(env, settings);
...
tableEnv.executeSql("create table ....")

-1
投票

如果您想使用保存点,当您停止或取消作业时,您必须设置保存点; 然后你可以使用保存点运行。

bin/flink stop --type [native/canonical] --savepointPath [:targetDirectory] :jobId
© www.soinside.com 2019 - 2024. All rights reserved.