Flink Kafka Source Operator 重新连接问题

问题描述 投票:0回答:1

我是 flink 新手,我有一个 flink 管道,其中有 5 个运算符,其中 2 个是 kafka 源运算符,消耗来自 2 个不同主题的 kafka 消息。我在这两个来源上每分钟都启用检查点。我正在尝试测试一个用例,在该用例中,我关闭 kafka 并在 10 分钟后重新启动。
我的 flink 版本是

1.16.1
和相同的 kafka-connector 版本。任务失败配置设置默认不受影响,类似于kafka消费者配置是默认的。

为了降低 kafka pod,我将它们缩小到 0,并在 10 分钟后将它们缩小回原始值。

预期行为:一旦 kafka 启动,操作员应自动重新连接。

实际行为:操作员没有重新连接,任务也没有失败,作业状态也没有改变。它陷入了某个循环,只有在我删除 jm 和 tm pod 后它才会恢复。

kafka启动运行后的部分日志:

org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms.
2023-08-29 07:32:01.945 [Source Data Fetcher for Source: Kafka Data source (3/3)#0] INFO  org.apache.kafka.clients.NetworkClient  - [Consumer clientId=data-group-2, groupId=data-group] Disconnecting from node 1 due to socket connection setup timeout. The timeout value is 30096 ms.
2023-08-29 07:32:01.945 [Source Data Fetcher for Source: Kafka Data source (3/3)#0] INFO  org.apache.kafka.clients.FetchSessionHandler  - [Consumer clientId=data-group-2, groupId=data-group] Error sending fetch request (sessionId=1361712539, epoch=INITIAL) to node 1:
org.apache.kafka.common.errors.DisconnectException: null
apache-kafka apache-flink kafka-consumer-api
1个回答
0
投票

我进一步研究了这个问题,这个问题可能有很多可能的原因,但在我的例子中,问题是kafka客户端在开始时解析了kafka代理的IP并缓存了信息。由于所有代理重新启动,它们的 IP 都发生了变化,但 kafka 客户端仍然引用旧的 IP,从而陷入困境。
更多详情可以参考KAFKA-7931

© www.soinside.com 2019 - 2024. All rights reserved.