Kafka消费者--待取的记录永远不会被删除,投票一直返回0条记录。

问题描述 投票:1回答:1

我们写了一个Kafka消费者,它根据配置进行数据轮询,每次轮询返回大约400条avro记录,我们进行缓冲。缓冲后,我们对末端的偏移量进行搜索。当缓冲区大小达到2000时,我们使用执行服务线程将它们写入HDFS,等待全部完成后使用Future.get。在HDFS(staging文件夹)中不断追加相同的文件,直到达到10k的提交大小。达到提交大小后,我们将文件从暂存目录移到HDFS中的最终输出位置(这是为原子提交而做的)。下一次缓冲数据的刷新将在暂存目录中生成新文件。这样做的目的是为了避免HDFS中的文件过小,并限制打开的文件句柄。将文件提交到输出位置后,我们对偏移量进行异步提交。

5分钟后,当完成了4-5次成功的写入和偏移提交后,我们开始得到这个日志。

2020-05-24 06:29:19 TRACE Fetcher:1122 - [Consumer clientId=consumer-xxx.events-1, groupId=metadata.events] Skipping fetch for partition topic.metadata.events-1 because previous request to kafka3-ckafka1-perf3-nvan.globalrelay.net:9093 (id: 3 rack: null) has not been processed.

我相信这个节点被提出了一些取回请求,而这个节点的回调从未将其从待处理的取回中移除。

这就是创建请求未来的地方。https:/github.comapachekafkablobtrunkclientssrcmainjavaorgapachekafkaclientsconsumerinternalsFetcher.java#L261。

这是回调处理程序的一部分,我们将请求从挂起中移除。https:/github.comapachekafkablobtrunkclientssrcmainjavaorgapachekafkaclientsconsumerinternalsFetcher.java#L321。

它已经阻止了我对Kafka消费者的可用性测试,因为我们每次运行都会在5分钟内遇到这个问题。记录消耗率从1000左右开始,当poll返回零记录时,就会降到5rps。它永远不会恢复。线程数和CPU使用率也会下降。

apache-kafka kafka-consumer-api
1个回答
1
投票

这个问题是由于打开的文件句柄造成的。这是因为Avro的dataFileWriter api,在append模式下。它采用可寻的输入和输出流。当你关闭写入器时,它关闭了输出流,但没有关闭可寻找的输入。因此,在所有的数据节点上,打开文件句柄的数量超过了44k,记录消耗率急剧下降。关闭可寻输入明确地解决了这个问题。

© www.soinside.com 2019 - 2024. All rights reserved.