无法与Kafka群集上的特定使用者组一起使用

问题描述 投票:0回答:1

[当我尝试使用特定的消费者组来消费主题时,它会失败。我可以在新的消费群体中使用同一主题。在主题上使用describe命令时,没有任何使用者附加到任何分区:

GROUP           TOPIC                   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
dracg          clog                    1          105288          105588          300             -                                            -               -
dracg          clog                    2          104232          104532          300             -                                            -               -
dracg          clog                    3          104525          104820          295             -                                            -               -
dracg          clog                    0          104941          105243          302             -                                            -               -

即使控制台使用者代码也无法与此使用者组一起使用我在下面提供控制台使用者日志的相关-group join-部分,组长

DEBUG [Consumer clientId=consumer-1, groupId=dracg] Joining group with current subscription: [clog] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
DEBUG [Consumer clientId=consumer-1, groupId=dracg] Sending JoinGroup (JoinGroupRequestData(groupId='dracg', sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, memberId='consumer-1-0d44d911-c975-4dfe-83d9-4b96b5fc9638', groupInstanceId='null', protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 0, 0, 0, 0, 1, 0, 23, 112, 114, 101, 112, 114, 111, 100, 45, 70, 66, 77, 66, 45, 99, 104, 97, 110, 110, 101, 108, 108, 111, 103, 0, 0, 0, 0])])) to coordinator kafkanode3.example.com:9092 (id: 2147483644 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
DEBUG [Consumer clientId=consumer-1, groupId=dracg] Received successful JoinGroup response: JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=5117, protocolName='range', leader='rdkafka-b5a0e7ac-8311-410f-bf04-b2b2712bad7a', memberId='consumer-1-0d44d911-c975-4dfe-83d9-4b96b5fc9638', members=[]) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
DEBUG [Consumer clientId=consumer-1, groupId=dracg] Sending follower SyncGroup to coordinator kafkanode3.example.com:9092 (id: 2147483644 rack: null): SyncGroupRequestData(groupId='dracg', generationId=5117, memberId='consumer-1-0d44d911-c975-4dfe-83d9-4b96b5fc9638', groupInstanceId='null', assignments=[]) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
INFO [Consumer clientId=consumer-1, groupId=dracg] Successfully joined group with generation 5117 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
DEBUG [Consumer clientId=consumer-1, groupId=dracg] Enabling heartbeat thread (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
INFO [Consumer clientId=consumer-1, groupId=dracg] Setting newly assigned partitions:  (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
DEBUG [Consumer clientId=consumer-1, groupId=dracg] Sending Heartbeat request to coordinator kafkanode3.example.com:9092 (id: 2147483644 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
DEBUG [Consumer clientId=consumer-1, groupId=dracg] Received successful Heartbeat response (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
DEBUG [Consumer clientId=consumer-1, groupId=dracg] Sending asynchronous auto-commit of offsets {} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
DEBUG [Consumer clientId=consumer-1, groupId=dracg] Completed asynchronous auto-commit of offsets {} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
DEBUG [Consumer clientId=consumer-1, groupId=dracg] Sending Heartbeat request to coordinator kafkanode3.example.com:9092 (id: 2147483644 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
DEBUG [Consumer clientId=consumer-1, groupId=dracg] Received successful Heartbeat response (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
DEBUG [Consumer clientId=consumer-1, groupId=dracg] Sending Heartbeat request to coordinator kafkanode3.example.com:9092 (id: 2147483644 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
DEBUG [Consumer clientId=consumer-1, groupId=dracg] Received successful Heartbeat response (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

Kafka版本2.3.0如果您对此有任何线索,请分享。

apache-kafka kafka-consumer-api
1个回答
0
投票

[Kafka使用Consumer Groups的概念来记住哪些客户端消耗了主题分区中的哪些偏移量。

因此,如果您已经在使用者组dracg中使用了一次消息,并且在可安慰使用者中将--group明确命名为dracg,则将忽略参数--from-beginning

背景是,Kafka已经知道该消费群,并将依靠其内部信息来决定接下来要消耗哪个偏移量。此信息存储在内部主题__consumer_offsets中。

当您指定其他--group时,第一次阅读消息时不会出现问题。但是,当您尝试再次读取具有相同组的消息--from-beginning时,将会看到相同的行为。

© www.soinside.com 2019 - 2024. All rights reserved.