kafka scala 消费者不从主题中读取消息,控制台中没有错误

问题描述 投票:0回答:1

我有下面的示例代码来读取来自 Kafka 主题的消息

package com.krushna
package kafkademo

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer}

import java.time.Duration
import java.util.{Collections, Properties}


object TestConsumer extends App {

  val consumer= new KafkaConsumer[String,String](getProperties())
  consumer.subscribe(Collections.singletonList("wm-cth-salesstreams"))

  while(true){
    val data = consumer.poll(Duration.ofSeconds(3))
    data.forEach(println(_))
  }

  def getProperties(): Properties = {
    val properties: Properties = new Properties
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "scala-c1-1234")
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    properties
  }
}

没有从主题中读取任何消息,具有以下配置的 kafka 控制台消费者正在读取消息。

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wm-cth-salesstream  --from-beginning --group c2

在 Scala/java 控制台中,我可以看到以下消息不断打印。

13:12:25.621 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Node 1001 sent an incremental fetch response for session 386731835 with 0 response partition(s), 1 implied partition(s)
13:12:25.621 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Added READ_UNCOMMITTED fetch request for partition wm-cth-salesstreams-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1001 rack: null), epoch=0}} to node localhost:9092 (id: 1001 rack: null)
13:12:25.621 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Built incremental fetch (sessionId=386731835, epoch=794) for node 1001. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
13:12:25.621 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(wm-cth-salesstreams-0)) to broker localhost:9092 (id: 1001 rack: null)
13:12:26.123 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Node 1001 sent an incremental fetch response for session 386731835 with 0 response partition(s), 1 implied partition(s)
13:12:26.123 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Added READ_UNCOMMITTED fetch request for partition wm-cth-salesstreams-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1001 rack: null), epoch=0}} to node localhost:9092 (id: 1001 rack: null)
13:12:26.123 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Built incremental fetch (sessionId=386731835, epoch=795) for node 1001. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
13:12:26.123 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(wm-cth-salesstreams-0)) to broker localhost:9092 (id: 1001 rack: null)
13:12:26.374 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Sending asynchronous auto-commit of offsets {wm-cth-salesstreams-0=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}}
13:12:26.378 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Committed offset 0 for partition wm-cth-salesstreams-0
13:12:26.378 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Completed asynchronous auto-commit of offsets {wm-cth-salesstreams-0=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}}
13:12:26.625 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Node 1001 sent an incremental fetch response for session 386731835 with 0 response partition(s), 1 implied partition(s)
13:12:26.626 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Added READ_UNCOMMITTED fetch request for partition wm-cth-salesstreams-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1001 rack: null), epoch=0}} to node localhost:9092 (id: 1001 rack: null)
13:12:26.626 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Built incremental fetch (sessionId=386731835, epoch=796) for node 1001. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)

可能的错误是什么?

编辑

/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups

的输出
GROUP           TOPIC              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
c1              wm-cth-salesstream 0          360             360             0               console-consumer-3cba8e14-0835-48e6-9620-296aa32aa551 /127.0.0.1      console-consumer

GROUP           TOPIC              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
c10             wm-cth-salesstream 0          360             360             0               console-consumer-614744ee-33ac-4dc9-87c7-66c6a1cdaa3a /127.0.0.1      console-consumer

Consumer group 'c2' has no active members.

GROUP           TOPIC              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
c2              wm-cth-salesstream 0          360             360             0               -               -               -

Consumer group 'kafka-java-consumer' has no active members.

GROUP               TOPIC               PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
kafka-java-consumer wm-cth-salesstreams 0          0               0               0               -               -               -

Consumer group 'kafka-java-consumer-v1' has no active members.

GROUP                  TOPIC               PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
kafka-java-consumer-v1 wm-cth-salesstreams 0          0               0               0               -               -               -

Consumer group 'new-c1' has no active members.

GROUP           TOPIC               PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
new-c1          wm-cth-salesstreams 0          0               0               0               -               -               -

Consumer group 'scala-c1' has no active members.

GROUP           TOPIC               PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
scala-c1        wm-cth-salesstreams 0          0               0               0               -               -               -

GROUP           TOPIC               PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                   HOST            CLIENT-ID
scala-c1-1234   wm-cth-salesstreams 0          0               0               0               consumer-scala-c1-1234-1-bcc35a35-eb8b-495a-acd9-b98c0d528f93 /172.18.0.1     consumer-scala-c1-1234-1
scala apache-kafka kafka-consumer-api
1个回答
0
投票

如果您之前曾在

wm-cth-salesstream
组中就主题
scala-c1
运行过任何消费者,并且从未产生过更多数据,那么就没有更多可读的内容,您将不会获得任何输出。

auto.offset.reset=earliest
仅在没有提交给组的现有抵消时适用

© www.soinside.com 2019 - 2024. All rights reserved.