我观察到在使用内存后端时检查点会导致观察到的延迟意外增加。
考虑以下检查点:
2019-02-27 15:35:46,322 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 2 @ 1551281746322 for job a80597b3312f0704beed75397c371bf5.
2019-02-27 15:35:46,326 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Heap backend snapshot (In-Memory Stream Factory, synchronous part) in thread Thread[KeyedProcess -> Map -> Sink: Unnamed (1/1),5,Flink Task Threads] took 0 ms.
2019-02-27 15:35:46,342 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous part) in thread Thread[Async calls on Source: Custom Source -> Map -> Timestamps/Watermarks (1/1),5,Flink Task Threads] took 2 ms.
2019-02-27 15:35:46,346 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, asynchronous part) in thread Thread[pool-14-thread-2,5,Flink Task Threads] took 3 ms.
2019-02-27 15:35:46,351 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Heap backend snapshot (In-Memory Stream Factory, asynchronous part) in thread Thread[pool-11-thread-2,5,Flink Task Threads] took 14 ms.
2019-02-27 15:35:46,378 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 2 for job a80597b3312f0704beed75397c371bf5 (1157653 bytes in 54 ms).
尽管端到端持续时间仅为50毫秒,但在15:35:46,385
注入事件的响应仅达到了15:35:46,905
(520毫秒后)。在这两个时间戳之间没有处理任何事件。没有检查点,99.99%的延迟是~15ms。
建立:
System.nanoTime
差异来测量潜伏期编辑:这是一个线性工作,所以我猜测检查点障碍没有对齐。
时间花在对RabbitMQ的消息的同步ACK中(MessageAcknowledgingSourceBase#notifyCheckpointComplete
> MultipleIdsMessageAcknowledgingSourceBase#acknowledgeIDs
> RMQSource#acknowledgeSessionIDs
)。这可能是异步的,就像Kafka连接器那样。
因为我的检查点间隔是3分钟并且我注入200 ev / s,这意味着每个检查点触发36k消息(200 * 60 * 3)的确认,这需要大约500ms。
使用较小的间隔可能有助于获得更可预测的延迟,代价是更高的中值延迟。