KafkaConsumer 从远程 kafka 服务器获取空记录,但在 kafka 服务器机器上运行相同程序时获取记录

问题描述 投票:0回答:0

我正在使用 Kafka 服务器 3.3.1 和 java 包 Kafka-Clients 3.3.1.

我有一个简单的 KafkaConsumer 程序如下:

        Properties consumerProperties= new Properties();
        // fake ip address
        consumerProperties.put("bootstrap.servers", "220.220.220.220:9092");

        consumerProperties.put("group.id", "TestConsumerGroup");
        consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        consumerProperties.put("auto.offset.reset", "earliest");
        consumerProperties.put("client.id", "remote1");

        KafkaConsumer consumer = new KafkaConsumer(consumerProperties);

        String topicName="quickstart-events";
        consumer.subscribe(Collections.singletonList(topicName));

        Duration duration = Duration.ofSeconds(10);
        int epoch=1;
        while(true){

            System.out.println(epoch++);

            ConsumerRecords<String, String> records=consumer.poll(5000);
            consumer.seekToBeginning(consumer.assignment());

            System.out.println("records count:"+records.count());
            for(var record: records){
                System.out.printf("topic=%s, partition=%d, offset=%d, key=%s, value=%s\n", record.topic(),
                record.partition(), record.offset(), record.key(), record.value());
            }
        }

首先,我在机器 A 上运行这个程序(这可以通过 ssh 访问 kafka server machien),这是与 kafka 服务器机器不同的机器。 consumerProperties.put("bootstrap.servers", "220.220.220.220:9092");// public ip 220.220.220.220:9092 这是kafka服务器机器

然后无论超时时间长短,记录计数始终为空。顺便说一句,如果我调用 listTopics() 方法,那么我可以在机器 A 中成功获得结果。

其次,我在kafka服务器机器上部署这个程序。然后就可以成功读取消息记录了

你知道原因吗?谢谢 那么

预期的结果是我在kafka服务器机器上运行这个程序应该得到相同的结果。

apache-kafka kafka-consumer-api
© www.soinside.com 2019 - 2024. All rights reserved.