我做了Apache Kafka 0.10.1.0的全新安装。
我能够在命令提示符下发送/接收消息。
使用Producer / Consumer Java示例时,我无法知道Consumer Example上的group.id参数。
让我知道如何解决这个问题。
以下是我用过的消费者示例:
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-topic");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
try {
consumer.subscribe(Arrays.asList("my-topic"));
ConsumerRecords<String, String> records = consumer.poll(100);
System.err.println("records size=>"+records.count());
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
catch (Exception ex){
ex.printStackTrace();
}
finally {
consumer.close();
}
}
在为消费者运行命令之后,我可以看到生产者发布的消息(在控制台上)。但无法从java程序中看到消息
bin \ windows \ kafka-console-consumer.bat --bootstrap-server localhost:9092 - topic my-topic --from-beginning
消费者使用消费者组名称标记自己,并且发布到主题的每个记录被传递到每个订阅消费者组中的一个消费者实例。消费者实例可以在单独的进程中,也可以在不同的机器
如果所有使用者实例具有相同的使用者组,则记录将有效地在使用者实例上进行负载平衡。
如果所有消费者实例具有不同的消费者组,则每个记录将广播到所有消费者进程。
group.id是一个字符串,用于唯一标识此使用者所属的使用者进程组。
(Qazxswpoi)
将任意随机值提供给组ID。没关系。
Kafka intro
在您提供的代码中,您只需等待100ms的数据。您应该循环接收数据或等待更长的时间(在这种情况下,您只能获得一部分数据)。至于'group.id',你从控制台运行消费者的情况是随机的'group.id'。
由于没有提供偏移量,java客户端将等待新消息,但不会显示现有消息 - 这是预期的。如果打算阅读主题中已有的所有消息,可以使用以下代码:
props.put("group.id", "Any Random Value");
以下是分区和消费者属性group.id的一些测试结果
if (READ_FROM_BEGINNING) {
//consume all the messages from the topic from the beginning.
//this doesn't work reliably if it consumer.poll(..) is not called first
//probably because of lazy-loading issues
consumer.poll(10);
consumer.seekToBeginning(consumer.assignment()); //if intending to
//read from the beginning or call below to read from a predefined offset.
//consumer.seek(consumer.assignment().iterator().next(), READ_FROM_OFFSET);
}
consumer.group id用于对生成的数据进行负载均衡(如果group.id对于每个使用者都不同,则每个使用者将获得数据的副本)
如果partition = 1且消费者总数= 2,则两个活跃消费者中只有一个会获得数据
如果partition = 2且消费者总数= 2,则两个活跃消费者中的每一个均匀地获取数据
如果partition = 3且消费者总数= 2,则两个活跃消费者中的每一个都将获得数据。一个消费者从2个分区获取数据,其他消费者从1个分区获取数据。
如果partition = 3且消费者总数= 3,则三个活跃消费者中的每一个均匀地获取数据。
使用者组标识应在Kafka consumer.properties文件中定义的使用者组。
请将“my-topic”添加到使用者组,它应该如下所示:
Properties props = new Properties();
//set all other properties as required
props.put("group.id", "ConsumerGroup1");
props.put("max.poll.records", "1");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);