Flink - Statebackend 可以允许清除 RocksDB 表中的旧数据吗?

问题描述 投票:0回答:1

在下面的代码中:

class EnrichmentStream {
    private val checkpointsDir  = "file://${System.getProperty("user.dir")}/checkpoints/"
    private val rocksDBStateDir = "file://${System.getProperty("user.dir")}/state/rocksdb/"

    companion object {
        @JvmStatic
        fun main(args: Array<String>) {
            EnrichmentStream().runStream()
        }
    }

    fun runStream() {
        val environment = StreamExecutionEnvironment
            .createLocalEnvironmentWithWebUI(Configuration())

        environment.parallelism = 3

        // Checkpoint Configurations
        environment.enableCheckpointing(5000)
        environment.checkpointConfig.minPauseBetweenCheckpoints = 100
        environment.checkpointConfig.setCheckpointStorage(checkpointsDir)

        val stateBackend = EmbeddedRocksDBStateBackend()
        stateBackend.setDbStoragePath(rocksDBStateDir)
        environment.stateBackend = stateBackend

        environment.checkpointConfig.externalizedCheckpointCleanup =
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION

        // Configure Restart Strategy
        environment.restartStrategy = RestartStrategies.fixedDelayRestart(5, Time.seconds(5))

        val tableEnvironment = StreamTableEnvironment.create(environment)

        // Run some SQL queries to check the existing Catalogs, Databases and Tables
        tableEnvironment
            .executeSql("SHOW CATALOGS")
            .print()

        tableEnvironment
            .executeSql("SHOW DATABASES")
            .print()

        tableEnvironment
            .executeSql("SHOW TABLES")
            .print()

        tableEnvironment
            .executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
            .print()

    }
}

Flink StateBackend 可以删除 RocksDB 表中的旧记录(超过 2 小时)吗(

Orders
)?这样表(
Orders
) 的记录在任何时间点的生命周期都小于 2 小时。我们使用这些数据进行比较。

如果是,

CREATE TABLE
通知该间隔的语法是什么?

java stream apache-flink flink-streaming
1个回答
0
投票

您可以做的是将

table.exec.state.ttl
设置为两个小时 - 请参阅 文档 了解具体信息。

这样做的效果是,该作业的所有状态将在两小时后过期。 (目前)无法在每个表的基础上指定状态 TTL。

© www.soinside.com 2019 - 2024. All rights reserved.