我们正在运行的ListState在300GB到400GB之间的作业,并且有时列表可以增长到几千个。在我们的用例中,每个项目都必须具有自己的TTL,因此我们为此列表状态的每个新项目创建一个新的Timer,并在S3上使用RocksDB后端。
当前当前大约有140+百万个计时器(将在event.timestamp + 40days]触发)。
我们的问题是,作业的检查点突然卡住,或者非常缓慢(例如几小时内达到1%),直到最终超时。它通常在一段代码上停止(flink仪表板显示0/12 (0%)
,而前几行显示12/12 (100%)
),这很简单:
[...] val myStream = env.addSource(someKafkaConsumer) .rebalance .map(new CounterMapFunction[ControlGroup]("source.kafkaconsumer")) .uid("src_kafka_stream") .name("some_name") myStream.process(new MonitoringProcessFunction()).uid("monitoring_uuid").name(monitoring_name) .getSideOutput(outputTag) .keyBy(_.name) .addSink(sink) [...]
很少有更多信息:
state.backend.rocksdb.thread.num = 4
这是我们第二次碰巧,拓扑运行得很好,每天有几百万个事件,并且突然停止检查点。我们不知道是什么原因造成的。
任何人都可以想到会突然导致检查点卡住的原因吗?
我们正在运行的ListState在300GB到400GB之间的作业,有时列表会增长到几千。在我们的用例中,每个项目都必须具有自己的TTL,因此我们创建了一个新的...
一些想法: