我是 flink 新手,我有一个 flink 管道,其中有 5 个运算符,其中 2 个是 kafka 源运算符,消耗来自 2 个不同主题的 kafka 消息。我在这两个来源上每分钟都启用检查点。我正在尝试测试一个用例,在该用例中,我关闭 kafka 并在 10 分钟后重新启动。
我的 flink 版本是
1.16.1
和相同的 kafka-connector 版本。任务失败配置设置默认不受影响,类似于kafka消费者配置是默认的。
为了降低 kafka pod,我将它们缩小到 0,并在 10 分钟后将它们缩小回原始值。
预期行为:一旦 kafka 启动,操作员应自动重新连接。
实际行为:操作员没有重新连接,任务也没有失败,作业状态也没有改变。它陷入了某个循环,只有在我删除 jm 和 tm pod 后它才会恢复。
kafka启动运行后的部分日志:
org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms.
2023-08-29 07:32:01.945 [Source Data Fetcher for Source: Kafka Data source (3/3)#0] INFO org.apache.kafka.clients.NetworkClient - [Consumer clientId=data-group-2, groupId=data-group] Disconnecting from node 1 due to socket connection setup timeout. The timeout value is 30096 ms.
2023-08-29 07:32:01.945 [Source Data Fetcher for Source: Kafka Data source (3/3)#0] INFO org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=data-group-2, groupId=data-group] Error sending fetch request (sessionId=1361712539, epoch=INITIAL) to node 1:
org.apache.kafka.common.errors.DisconnectException: null
我进一步研究了这个问题,这个问题可能有很多可能的原因,但在我的例子中,问题是kafka客户端在开始时解析了kafka代理的IP并缓存了信息。由于所有代理重新启动,它们的 IP 都发生了变化,但 kafka 客户端仍然引用旧的 IP,从而陷入困境。
更多详情可以参考KAFKA-7931