使用Rocksdb到期后,Flink ValueState将从存储中删除吗?

问题描述 投票:0回答:1
  • 我正在使用带有rocksdb后端的Flink版本1.10.1。
  • 我知道rocksdb使用“托管内存”中的内存,并且我没有为托管内存设置任何特定值。它是由Flink完成的。
  • [当我监视我的应用程序时,任务管理器的可用内存总是减少的(我是指通过free -h测量的操作系统的可用内存)。我怀疑原因可能是Rocksdb。
  • Question_1 =>如果ValueState的值已过期,那么rocksdb是否将从其内存中删除并从localstorage目录中删除? (我的存储容量也有限)
  • Question_2 => stream.keyBy(ipAddress),如果rocksdb持有此ipAddress(我在说的是keyBy本身而不是状态),它是否始终放置在托管内存中?如果没有,那么flink堆内存会增加吗?

这是我的应用程序的一般结构:

streamA = source.filter(..);
streamA2 = source2.filter(..);
streamB = streamA.keyBy(ipAddr).window().process(); // contains value state
streamC = streamA.keyBy(ipAddr).flatMap(..); // contains value state
streamD = streamA2.keyBy(ipAddr).window.process(); // contains value state
streamE = streamA.union(streamA2).keyBy(ipAddr)....

这是我的应用程序中的状态示例:

 private transient ValueState<SampleObject> sampleState;
 StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.minutes(10))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        ValueStateDescriptor<SampleObject> sampleValueStateDescriptor = new ValueStateDescriptor<>(
                "sampleState",
                TypeInformation.of(SampleObject.class)
        );
        sampleValueStateDescriptor.enableTimeToLive(ttlConfig);

Rocksdb配置:

state.backend: rocksdb
state.backend.rocksdb.checkpoint.transfer.thread.num: 6
state.backend.rocksdb.localdir: /pathTo/checkpoint_only_local

为什么我要使用Rocksdb

  • 我使用rocksdb是因为我有一个很大的密钥大小(认为它是IP地址),而HeapState后端或其他方法无法处理。
  • 我的应用程序使用rocksdb,因为我在用户定义的keyedprocess函数中有很多状态,以供将来决策。 (每个状态都有`StateTtlConfig)

注意

  • 我的应用程序不需要增量检查点或有关保存点的任何内容。我不在乎保存应用程序的所有快照。
apache-flink flink-streaming rocksdb
1个回答
0
投票

在您的状态ttl配置中,您尚未指定要如何进行状态清除。在这种情况下,过期的值会在读取时显式删除(例如ValueState#value),否则会定期在后台收集垃圾。对于RocksDB,此后台清理是在压缩期间完成的。换句话说,清理不是立即进行的。 docs提供了有关如何进行调整的更多详细信息-您可以配置清理速度更快,但会降低性能。

keyBy本身不使用任何状态。密钥选择器功能用于对流进行分区,但是密钥不会与keyBy关联存储。只有窗口和平面图操作保持状态,即每个键的状态,并且所有此键状态都将在RocksDB中(除非您将计时器配置为在堆中,这是一个选项,但是Flink 1.10计时器中)默认情况下是非堆存储在rocksdb中)。

您可以将flatmap更改为KeyedProcessFunction,并使用计时器来明确清除状态键的状态-这将使您可以直接控制清除状态的确切时间,而不是依靠状态TTL机制来最终清除状态。

但是,窗户更有可能建立起相当大的状态。如果您可以切换到进行预聚合(通过reduceaggregate),则可能会很有帮助。

© www.soinside.com 2019 - 2024. All rights reserved.