使用 subscribe() 和 allocate() 读取 kafka 主题的奇怪区别

问题描述 投票:0回答:1

我的任务是统计Kafka主题中的消息(有些有一个分区,有些有多个分区)。我尝试了两种技术:一种使用

subscribe()
,另一种使用
assign()

完整代码:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import kafka
from kafka.structs import TopicPartition


def count_messages(consumer):
    i = 0
    while True:
        records = consumer.poll(50)     # timeout in millis
        if not records:
            break
        for _, consumer_records in records.items():
            for _ in consumer_records:
                i += 1
    return i


def show_messageges_assign(kafka_server, topic):
    cnt = 0
    consumer = kafka.KafkaConsumer(bootstrap_servers=kafka_server, group_id=None, auto_offset_reset='earliest', enable_auto_commit=False)
    try:
        partitions = consumer.partitions_for_topic(topic)
        if partitions:
            for partition in partitions:
                tp = TopicPartition(topic, partition)
                consumer.assign([tp])
                consumer.seek(partition=tp, offset=0)
                cnt += count_messages(consumer)
    finally:
        consumer.close()
    print('%s assign, partitions: %s, msg cnt=%d' % (topic, partitions, cnt))


def show_messageges_subscribe(kafka_server, topic):
    cnt = 0
    consumer = kafka.KafkaConsumer(bootstrap_servers=kafka_server, group_id=None, auto_offset_reset='earliest', enable_auto_commit=False)
    try:
        consumer.subscribe([topic])
        cnt += count_messages(consumer)
    finally:
        consumer.close()
    print('%s, subscribe, msg cnt=%d' % (topic, cnt))


def test_topic(kafka_server, topic):
    show_messageges_assign(kafka_server, topic)
    show_messageges_subscribe(kafka_server, topic)
    print('')


def main():
    kafka_server = '169.0.1.77:9092'
    test_topic(kafka_server, 'gc.ifd.analyse.fdp')
    test_topic(kafka_server, 'gc.ifd.result.fdp')


if __name__ == '__main__':
    main()

问题:

  1. subscribe()
    不适用于某些机器。在同一台机器上
    assign()
    可以工作,但不是很可靠。

  2. assign()
    适用于我拥有的所有测试机器,但是当有很多分区时,我认为它不会从所有分区读取

结果:

经测试,两个主题共1435条消息。我可以在网络浏览器“Apache Kafka UI”中看到它们。

我的 4 台测试机上的结果:

机器 1 和 2:

[root@test-kafka emulator]# python3 so_kafka_checker.py
gc.ifd.analyse.fdp assign, partitions: {0}, msg cnt=1435
gc.ifd.analyse.fdp, subscribe, msg cnt=1435

gc.ifd.result.fdp assign, partitions: {0, 1, 2, 3}, msg cnt=366
gc.ifd.result.fdp, subscribe, msg cnt=1435


[root@igg emulator]# python3 so_kafka_checker.py
gc.ifd.analyse.fdp assign, partitions: {0}, msg cnt=1435
gc.ifd.analyse.fdp, subscribe, msg cnt=1435

gc.ifd.result.fdp assign, partitions: {0, 1, 2, 3}, msg cnt=366
gc.ifd.result.fdp, subscribe, msg cnt=1435

3号机和4号机:

[mn: emulator] python3 so_kafka_checker.py
gc.ifd.analyse.fdp assign, partitions: {0}, msg cnt=1435
gc.ifd.analyse.fdp, subscribe, msg cnt=0

gc.ifd.result.fdp assign, partitions: {0, 1, 2, 3}, msg cnt=366
gc.ifd.result.fdp, subscribe, msg cnt=0


[root@test-gc emulator]# python3 so_kafka_checker.py
gc.ifd.analyse.fdp assign, partitions: {0}, msg cnt=1435
gc.ifd.analyse.fdp, subscribe, msg cnt=0

gc.ifd.result.fdp assign, partitions: {0, 1, 2, 3}, msg cnt=366
gc.ifd.result.fdp, subscribe, msg cnt=0

正如您所见,使用

assign()
我可以在所有测试机器上读取具有 4 个分区的主题,但并非所有消息都会被读取。

在两台机器上

subscribe()
不读取任何消息,而在另外两台机器上则读取所有消息。

我的代码或环境有问题吗?


我根据 StéphaneD 更改了

poll()
的使用。评论:

def count_messages(consumer):
    i = 0
    try_cnt = 0
    while True:
        records = consumer.poll(50)     # timeout in millis
        if not records:
            try_cnt += 1
            if try_cnt > 10:
                break
        for _, consumer_records in records.items():
            for _ in consumer_records:
                i += 1
    return i

通过此更改,我的程序似乎可以读取带有

subscribe()
assign()
的所有消息。

apache-kafka kafka-consumer-api kafka-python
1个回答
0
投票

consumer.poll()
没有记录时,先不要停止:添加重试策略,至少检查X
poll()
是否连续为空,然后停止。

由于 Kafka 代理算法/优化,我认为 Kafka 不能保证

poll()
始终会返回某些内容(即使您知道有某些内容)。

© www.soinside.com 2019 - 2024. All rights reserved.