在命令行中在Kafka中创建多个使用者

问题描述 投票:6回答:5

我是卡夫卡的新人。当我在命令行中运行快速启动示例时,我发现无法在命令行中创建多个使用者。

条件:

我用3个分区构建了一个名为test的主题,我还在这个主题上构建了一个生产者。

然后我想创建两个不同的消费者,在这个主题上共享一个名为test1的同一个消费者组。

我按下面两次运行命令:

   bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --group test1

第一个工作,但当我第二次运行时,第一个将断开连接,第二个工作。

那么如何在命令行中在同一个使用者组中创建两个或更多个使用者?

    WARN Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146)
apache-kafka
5个回答
17
投票
  1. 默认情况下,kafka-console-consumer.sh将创建一个随机组。
  2. 如果要指定组名,可以: 将group.id=group_name添加到本地文件filename 使用--consumer.config filenamekafka-console-consumer.sh选项来设置组
  3. 您可以在zookeeper的/consumers/目录中查看您的组。

参考:kafka/core/src/main/scala/kafka/tools/ConsoleConsumer.scala


17
投票

除了使用--consumer.config选项,如secfree的答案,你也可以使用

--consumer-property group.id=your_group

选项以指定组名而不编辑配置文件。


6
投票

您可以使用以下命令在“test-consumer-group”组中创建“test”主题的使用者:

bin/kafka-console-consumer.sh --bootstrap-server <brokerIP>:9092 --topic test --consumer-property group.id=test-consumer-group

下面的命令将列出使用者组配置:

bin/kafka-consumer-groups.sh --bootstrap-server <brokerIP>:9092 --describe --group test-consumer-group

例如:

GROUP || TOPIC || PARTITION || CURRENT-OFFSET || LOG-END-OFFSET || LAG      || OWNER
test-consumer-group || test || 0 || 10 || 10 || 0 || consumer-1_/10.210.223.170

1
投票

用这个:

--partition <Integer: partition>        The partition to consume from. 

0
投票

当您在没有groupid的情况下使用Topic时,Kafka会为您的会话创建随机groupid。您可以指定groupid --consumer-property group.id = test-consumer-group如果groupid存在,或者您可以在使用时添加到您的会话中新的groupid(名称)如果组不存在--topic second-topic --group my -first-group p和Kafka将创建新组

© www.soinside.com 2019 - 2024. All rights reserved.