我有一个java类正在向flink集群提交sql文件。
我有
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
streamExecutionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
streamExecutionEnvironment.getCheckpointConfig().setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
streamExecutionEnvironment.enableCheckpointing(5000, CheckpointingMode.AT_LEAST_ONCE);
streamExecutionEnvironment.getCheckpointConfig().setCheckpointStorage(customParams.get("checkpoint_path"));
Configuration config = new Configuration();
config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
config.set(PipelineOptions.NAME, customParams.get("pipeline_name"));
if (restartFromSavepointPath != null) {
config.set(SAVEPOINT_PATH, restartFromSavepointPath);
}
streamExecutionEnvironment.setStateBackend(new EmbeddedRocksDBStateBackend(true));
streamExecutionEnvironment.configure(config);
...
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamExecutionEnvironment);
tableEnv.executeSql("create table ....");
// this is end of the main class
为了获取
restartFromSavepointPath
,我有一些代码可以获取最新的检查点位置,我可以看到该值为file:///tmp/flink-checkpoint-directory-domain/a98c68e3139041bc32e6a931e1f701e1/chk-24/_metadata
当我将上面的代码打包为 fat jar 并运行它时,作业不会从上面的检查点开始。 启动命令是
flink run -c com.some.Deployer /some/local/location/some.jar
--> 我如何让它从保存点启动(假设 execution.savepoint.path
是通过 config.set(SAVEPOINT_PATH, restartFromSavepointPath); 设置的)?
但是如果我使用
-s
选项 flink run -c com.some.Deployer -s file:///tmp/flink-checkpoint-directory-domain/a98c68e3139041bc32e6a931e1f701e1/chk-24/_metadata /some/local/location/some.jar
--> 这个将从保存点开始工作。
您可以将 flink-conf.yaml 中的“execution.savepoint.path”设置为最新的路径,这样就可以了。您还可以将“execution.savepoint.ignore-unclaimed-state”设置为“false”,这样如果没有从指定的保存点恢复,它将不会启动,如果设置为true,它将尝试从保存点恢复,如果失败则正常启动。 如果您使用的是 flink kubernetes Operator,您可以在作业规范的 FlinkDeployment yaml 中设置“initialSavepointPath”,如下所示:
job:
jarURI: {{ .Values.jarName }}
parallelism: {{ .Values.parallelism }}
entryClass: {{ .Values.entryClass }}
initialSavepointPath: {{ .Values.restorePath }}
您可以将values.restorePath替换为您的保存点位置。
我可以知道你如何以编程方式获取最新的保存点路径吗
我最终使用了
config.set(SAVEPOINT_PATH, checkpointPath);
,其中 config
是配置,checkpointPath
是 aws s3 中最新 _metadata
文件的路径
您想要做的是通过远程环境从保存点运行 flink 作业。
这实际上是可能的,即:
SavepointRestoreSettings restoreSettings = SavepointRestoreSettings.forPath("/savepoints/savepoint-1234", false, RestoreMode.NO_CLAIM);
env = new RemoteStreamEnvironment(remoteHost, remotePort, Configuration.fromMap(Map.of()), new String[] {}, null, restoreSettings);
StreamTableEnvironment.create(env, settings);
...
tableEnv.executeSql("create table ....")
但是您需要了解远程环境方法和 CLI/Rest API 方法之间的区别。 使用 CLI 和 Rest API 方法,您可以从远程提交带有保存点路径的作业,即
./bin/flink run -s <savepointPath> ...
意味着 <savepointPath>
是作业在执行(构建作业图)期间尝试读取的路径。远程 Flink 集群。
远程环境方法有所不同,因为您实际上是将已经构建的作业图提交到远程集群,因此读取保存点文件发生在客户端上。
实际上,这意味着远程环境方法要求您的设置提供对远程 Flink 集群和向集群提交作业的本地客户端的保存点的共享访问。就像使用共享的预定义 s3 文件夹应该可以工作,即
flink 集群配置(flink-conf.yaml):
state.savepoints.dir: s3://savepoints/
state.checkpoints.dir: s3://checkpoints/
本地客户:
SavepointRestoreSettings restoreSettings = SavepointRestoreSettings.forPath("s3://savepoints/savepoint-1234", false, RestoreMode.NO_CLAIM);
env = new RemoteStreamEnvironment(remoteHost, remotePort, Configuration.fromMap(Map.of()), new String[] {}, null, restoreSettings);
StreamTableEnvironment.create(env, settings);
...
tableEnv.executeSql("create table ....")
如果您想使用保存点,当您停止或取消作业时,您必须设置保存点; 然后你可以使用保存点运行。
bin/flink stop --type [native/canonical] --savepointPath [:targetDirectory] :jobId