我的任务是统计Kafka主题中的消息(有些有一个分区,有些有多个分区)。我尝试了两种技术:一种使用
subscribe()
,另一种使用 assign()
。
完整代码:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import kafka
from kafka.structs import TopicPartition
def count_messages(consumer):
i = 0
while True:
records = consumer.poll(50) # timeout in millis
if not records:
break
for _, consumer_records in records.items():
for _ in consumer_records:
i += 1
return i
def show_messageges_assign(kafka_server, topic):
cnt = 0
consumer = kafka.KafkaConsumer(bootstrap_servers=kafka_server, group_id=None, auto_offset_reset='earliest', enable_auto_commit=False)
try:
partitions = consumer.partitions_for_topic(topic)
if partitions:
for partition in partitions:
tp = TopicPartition(topic, partition)
consumer.assign([tp])
consumer.seek(partition=tp, offset=0)
cnt += count_messages(consumer)
finally:
consumer.close()
print('%s assign, partitions: %s, msg cnt=%d' % (topic, partitions, cnt))
def show_messageges_subscribe(kafka_server, topic):
cnt = 0
consumer = kafka.KafkaConsumer(bootstrap_servers=kafka_server, group_id=None, auto_offset_reset='earliest', enable_auto_commit=False)
try:
consumer.subscribe([topic])
cnt += count_messages(consumer)
finally:
consumer.close()
print('%s, subscribe, msg cnt=%d' % (topic, cnt))
def test_topic(kafka_server, topic):
show_messageges_assign(kafka_server, topic)
show_messageges_subscribe(kafka_server, topic)
print('')
def main():
kafka_server = '169.0.1.77:9092'
test_topic(kafka_server, 'gc.ifd.analyse.fdp')
test_topic(kafka_server, 'gc.ifd.result.fdp')
if __name__ == '__main__':
main()
问题:
subscribe()
不适用于某些机器。在同一台机器上assign()
可以工作,但不是很可靠。
assign()
适用于我拥有的所有测试机器,但是当有很多分区时,我认为它不会从所有分区读取
结果:
经测试,两个主题共1435条消息。我可以在网络浏览器“Apache Kafka UI”中看到它们。
我的 4 台测试机上的结果:
机器 1 和 2:
[root@test-kafka emulator]# python3 so_kafka_checker.py
gc.ifd.analyse.fdp assign, partitions: {0}, msg cnt=1435
gc.ifd.analyse.fdp, subscribe, msg cnt=1435
gc.ifd.result.fdp assign, partitions: {0, 1, 2, 3}, msg cnt=366
gc.ifd.result.fdp, subscribe, msg cnt=1435
[root@igg emulator]# python3 so_kafka_checker.py
gc.ifd.analyse.fdp assign, partitions: {0}, msg cnt=1435
gc.ifd.analyse.fdp, subscribe, msg cnt=1435
gc.ifd.result.fdp assign, partitions: {0, 1, 2, 3}, msg cnt=366
gc.ifd.result.fdp, subscribe, msg cnt=1435
3号机和4号机:
[mn: emulator] python3 so_kafka_checker.py
gc.ifd.analyse.fdp assign, partitions: {0}, msg cnt=1435
gc.ifd.analyse.fdp, subscribe, msg cnt=0
gc.ifd.result.fdp assign, partitions: {0, 1, 2, 3}, msg cnt=366
gc.ifd.result.fdp, subscribe, msg cnt=0
[root@test-gc emulator]# python3 so_kafka_checker.py
gc.ifd.analyse.fdp assign, partitions: {0}, msg cnt=1435
gc.ifd.analyse.fdp, subscribe, msg cnt=0
gc.ifd.result.fdp assign, partitions: {0, 1, 2, 3}, msg cnt=366
gc.ifd.result.fdp, subscribe, msg cnt=0
正如您所见,使用
assign()
我可以在所有测试机器上读取具有 4 个分区的主题,但并非所有消息都会被读取。
在两台机器上
subscribe()
不读取任何消息,而在另外两台机器上则读取所有消息。
我的代码或环境有问题吗?
我根据 StéphaneD 更改了
poll()
的使用。评论:
def count_messages(consumer):
i = 0
try_cnt = 0
while True:
records = consumer.poll(50) # timeout in millis
if not records:
try_cnt += 1
if try_cnt > 10:
break
for _, consumer_records in records.items():
for _ in consumer_records:
i += 1
return i
通过此更改,我的程序似乎可以读取带有
subscribe()
和 assign()
的所有消息。
当
consumer.poll()
没有记录时,先不要停止:添加重试策略,至少检查Xpoll()
是否连续为空,然后停止。
由于 Kafka 代理算法/优化,我认为 Kafka 不能保证
poll()
始终会返回某些内容(即使您知道有某些内容)。