消费者没有收到消息,卡夫卡控制台,新的消费API,卡夫卡0.9

问题描述 投票:26回答:12

我做Kafka Quickstart卡夫卡0.9.0.0。

我已经听饲养员在localhost:2181因为我跑

bin/zookeeper-server-start.sh config/zookeeper.properties

我有一个经纪人在听localhost:9092因为我跑

bin/kafka-server-start.sh config/server.properties

我有一个生产商发布主题“测试”,因为我跑

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
yello
is this thing on?
let's try another
gimme more

当我运行旧的API的消费者,它的工作方式运行

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

然而,当我运行新的API的消费者,我没有当我运行得到什么

bin/kafka-console-consumer.sh --new-consumer --topic test --from-beginning \
    --bootstrap-server localhost:9092

是否可以订阅使用新的API控制台消费者的话题?我怎样才能解决这个问题?

apache-kafka kafka-consumer-api
12个回答
29
投票

我在我的MAC盒我面临的控制台,消费者不消耗任何消息同样的问题,使用的命令时,

kafka-console-consumer --bootstrap-server localhost:9095 --from-beginning --topic my-replicated-topic

但是,当我试着用

kafka-console-consumer --bootstrap-server localhost:9095 --from-beginning --topic my-replicated-topic --partition 0

它高兴地列出发送的消息。这是卡夫卡1.10.11的错误吗?


0
投票

在我的情况下,它没有使用任何方法再工作,我还增加了日志级别在config/log4j.properties调试,启动控制台消费者

./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic MY_TOPIC

然后拿到下面的日志

[2018-03-11 12:11:25,711] DEBUG [MetadataCache brokerId=10] Error while fetching metadata for MY_TOPIC-3: leader not available (kafka.server.MetadataCache)

这里的关键是,我有两个卡夫卡节点,但一个是下降,由于某种原因默认卡夫卡控制台消费者如果有一些分区不可用,因为节点已关闭(在这种情况下,分区3)不会消耗。它不会在我的应用程序发生。

可能的解决方案是

  • 启动时的向下经纪人
  • 删除主题,再次创造这种方式的所有分区将被放置在网上代理节点

0
投票

从运行bin中的以下命令:

./kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

“测试”是主题名称


-2
投票

在kafka_2.11-0.11.0.0动物园管理员服务器已被弃用,并且它正在使用引导服务器,这将需要经纪人的IP地址和端口。如果你给正确的经纪人参数,你就可以使用消息。

例如$斌/ kafka-console-consumer.sh --bootstrap服务器:9093 --topic测试--from-开始

我使用的端口9093,你也可能会有所不同。

问候。


5
投票

我只是碰到了这个问题,解决办法是删除/brokers在动物园管理员,然后重新启动卡夫卡节点。

bin/zookeeper-shell <zk-host>:2181

接着

rmr /brokers

不知道为什么,这解决了它。

当我启用调试日志记录,我在消费一遍一遍看到这个错误信息:

2017-07-07 01:20:12 DEBUG AbstractCoordinator:548 - Sending GroupCoordinator request for group test to broker xx.xx.xx.xx:9092 (id: 1007 rack: null) 2017-07-07 01:20:12 DEBUG AbstractCoordinator:559 - Received GroupCoordinator response ClientResponse(receivedTimeMs=1499390412231, latencyMs=84, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=13,client_id=consumer-1}, responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) for group test 2017-07-07 01:20:12 DEBUG AbstractCoordinator:581 - Group coordinator lookup for group test failed: The group coordinator is not available. 2017-07-07 01:20:12 DEBUG AbstractCoordinator:215 - Coordinator discovery failed for group test, refreshing metadata


4
投票

对我来说,在这个线程所描述的解决方案工作 - https://stackoverflow.com/a/51540528/7568227

检查

offsets.topic.replication.factor

(或可能与复制其他配置参数)不大于经纪人的数目越高。那是在我的情况的问题。

没有必要此修复程序后使用--partition 0了。

否则,我建议遵循中提到的线索所描述的调试过程。


3
投票

我得到了同样的问题,我现在已经想通了。

当您使用--zookeeper,它应该具备的动物园管理员地址作为参数。

当您使用--bootstrap服务器,它应该具备代理地址作为参数。


3
投票

本地主机是这里的FOO。如果更换本地主机字实际主机名,它应该工作。

像这样:

制片人

./bin/kafka-console-producer.sh --broker-list \
sandbox-hdp.hortonworks.com:9092 --topic test

消费者:

./bin/kafka-console-consumer.sh --topic test --from-beginning \    
--bootstrap-server bin/kafka-console-consumer.sh --new-consumer \
--topic test --from-beginning \
--bootstrap-server localhost:9092

3
投票

渐渐我的Mac同样的问题。我查了日志,发现下面的错误。

Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). 
This error can be ignored if the cluster is starting up and not all brokers are up yet.

这可以通过改变复制因子为1添加以下行中server.properties并重新启动卡夫卡/动物园管理员被固定。

offsets.topic.replication.factor=1

2
投票

这个问题也影响使用水槽和水槽的数据HDFS从卡夫卡摄取数据。

要解决上述问题:

  1. 停止卡夫卡经纪人
  2. 连接到动物园管理员簇和删除/中间商ž节点
  3. 重新启动卡夫卡经纪商

有关于卡夫卡客户端版本,而我们使用的是集群斯卡拉版本没有问题。动物园管理员可能有关代理主机的错误信息。

要验证的操作:

创建卡夫卡的话题。

$ kafka-console-consumer --bootstrap-server slavenode01.cdh.com:9092 --topic rkkrishnaa3210 --from-beginning

打开一个生产者通道和进料的一些消息给它。

$ kafka-console-producer --broker-list slavenode03.cdh.com:9092 --topic rkkrishnaa3210

打开一个消费者信道从特定主题使用该消息。

$ kafka-console-consumer --bootstrap-server slavenode01.cdh.com:9092 --topic rkkrishnaa3210 --from-beginning

为了从水槽测试:

水槽代理配置:

rk.sources  = source1
rk.channels = channel1
rk.sinks = sink1

rk.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
rk.sources.source1.zookeeperConnect = ip-20-0-21-161.ec2.internal:2181
rk.sources.source1.topic = rkkrishnaa321
rk.sources.source1.groupId = flume1
rk.sources.source1.channels = channel1
rk.sources.source1.interceptors = i1
rk.sources.source1.interceptors.i1.type = timestamp
rk.sources.source1.kafka.consumer.timeout.ms = 100
rk.channels.channel1.type = memory
rk.channels.channel1.capacity = 10000
rk.channels.channel1.transactionCapacity = 1000
rk.sinks.sink1.type = hdfs
rk.sinks.sink1.hdfs.path = /user/ce_rk/kafka/%{topic}/%y-%m-%d
rk.sinks.sink1.hdfs.rollInterval = 5
rk.sinks.sink1.hdfs.rollSize = 0
rk.sinks.sink1.hdfs.rollCount = 0
rk.sinks.sink1.hdfs.fileType = DataStream
rk.sinks.sink1.channel = channel1

运行水槽剂:

flume-ng agent --conf . -f flume.conf -Dflume.root.logger=DEBUG,console -n rk

观察从从主题消息被写入在HDFS消费者日志。

18/02/16 05:21:14 INFO internals.AbstractCoordinator: Successfully joined group flume1 with generation 1
18/02/16 05:21:14 INFO internals.ConsumerCoordinator: Setting newly assigned partitions [rkkrishnaa3210-0] for group flume1
18/02/16 05:21:14 INFO kafka.SourceRebalanceListener: topic rkkrishnaa3210 - partition 0 assigned.
18/02/16 05:21:14 INFO kafka.KafkaSource: Kafka source source1 started.
18/02/16 05:21:14 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean.
18/02/16 05:21:14 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: source1 started
18/02/16 05:21:41 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
18/02/16 05:21:42 INFO hdfs.BucketWriter: Creating /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920.tmp
18/02/16 05:21:48 INFO hdfs.BucketWriter: Closing /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920.tmp
18/02/16 05:21:48 INFO hdfs.BucketWriter: Renaming /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920.tmp to /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920
18/02/16 05:21:48 INFO hdfs.HDFSEventSink: Writer callback called.

1
投票

能否请您尝试这样的:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

1
投票

使用此:斌/ kafka-console-consumer.sh --bootstrap-服务器localhost:9092 --topic测试--from-开始

请注意:你的命令删除“ - 新消费”

作为参考在这里看到:https://kafka.apache.org/quickstart

© www.soinside.com 2019 - 2024. All rights reserved.