在下面的代码中:
class EnrichmentStream {
private val checkpointsDir = "file://${System.getProperty("user.dir")}/checkpoints/"
private val rocksDBStateDir = "file://${System.getProperty("user.dir")}/state/rocksdb/"
companion object {
@JvmStatic
fun main(args: Array<String>) {
EnrichmentStream().runStream()
}
}
fun runStream() {
val environment = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(Configuration())
environment.parallelism = 3
// Checkpoint Configurations
environment.enableCheckpointing(5000)
environment.checkpointConfig.minPauseBetweenCheckpoints = 100
environment.checkpointConfig.setCheckpointStorage(checkpointsDir)
val stateBackend = EmbeddedRocksDBStateBackend()
stateBackend.setDbStoragePath(rocksDBStateDir)
environment.stateBackend = stateBackend
environment.checkpointConfig.externalizedCheckpointCleanup =
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
// Configure Restart Strategy
environment.restartStrategy = RestartStrategies.fixedDelayRestart(5, Time.seconds(5))
val tableEnvironment = StreamTableEnvironment.create(environment)
// Run some SQL queries to check the existing Catalogs, Databases and Tables
tableEnvironment
.executeSql("SHOW CATALOGS")
.print()
tableEnvironment
.executeSql("SHOW DATABASES")
.print()
tableEnvironment
.executeSql("SHOW TABLES")
.print()
tableEnvironment
.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
.print()
}
}
Flink StateBackend 可以删除 RocksDB 表中的旧记录(超过 2 小时)吗(
Orders
)?这样表(Orders
) 的记录在任何时间点的生命周期都小于 2 小时。我们使用这些数据进行比较。
如果是,
CREATE TABLE
通知该间隔的语法是什么?
您可以做的是将
table.exec.state.ttl
设置为两个小时 - 请参阅 文档 了解具体信息。
这样做的效果是,该作业的所有状态将在两小时后过期。 (目前)无法在每个表的基础上指定状态 TTL。