我正在使用 Kafka 服务器 3.3.1 和 java 包 Kafka-Clients 3.3.1.
我有一个简单的 KafkaConsumer 程序如下:
Properties consumerProperties= new Properties();
// fake ip address
consumerProperties.put("bootstrap.servers", "220.220.220.220:9092");
consumerProperties.put("group.id", "TestConsumerGroup");
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProperties.put("auto.offset.reset", "earliest");
consumerProperties.put("client.id", "remote1");
KafkaConsumer consumer = new KafkaConsumer(consumerProperties);
String topicName="quickstart-events";
consumer.subscribe(Collections.singletonList(topicName));
Duration duration = Duration.ofSeconds(10);
int epoch=1;
while(true){
System.out.println(epoch++);
ConsumerRecords<String, String> records=consumer.poll(5000);
consumer.seekToBeginning(consumer.assignment());
System.out.println("records count:"+records.count());
for(var record: records){
System.out.printf("topic=%s, partition=%d, offset=%d, key=%s, value=%s\n", record.topic(),
record.partition(), record.offset(), record.key(), record.value());
}
}
首先,我在机器 A 上运行这个程序(这可以通过 ssh 访问 kafka server machien),这是与 kafka 服务器机器不同的机器。 consumerProperties.put("bootstrap.servers", "220.220.220.220:9092");// public ip 220.220.220.220:9092 这是kafka服务器机器
然后无论超时时间长短,记录计数始终为空。顺便说一句,如果我调用 listTopics() 方法,那么我可以在机器 A 中成功获得结果。
其次,我在kafka服务器机器上部署这个程序。然后就可以成功读取消息记录了
你知道原因吗?谢谢 那么
预期的结果是我在kafka服务器机器上运行这个程序应该得到相同的结果。