Kafka 消费者 API 无法正常工作

问题描述 投票:0回答:1

我是 Kafka 的新手。我开始在 Kafka 上做我面临以下问题请帮助我解决这个问题提前谢谢。 首先,我正在编写生产者 API,它工作正常,但在执行消费者 API 消息时不显示。

我的代码是这样的:

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;


public class ConsumerGroup {
    public static void main(String[] args) throws Exception {

        String topic = "Hello-Kafka";
        String group = "myGroup";
        Properties props = new Properties();
        props.put("bootstrap.servers", "XXX.XX.XX.XX:9092");
        props.put("group.id", group);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        try {

            consumer.subscribe(Arrays.asList(topic));
            System.out.println("Subscribed to topic " + topic);


            ConsumerRecords<String, String> records = consumer.poll(100);

            System.out.println("records ::" + records);
            System.out.println(records.toString());
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Record::" + record.offset());
                System.out.println(record.key());
                System.out.println(record.value());
            }
            consumer.commitSync();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.commitSync();
            consumer.close();
        }
    }
}

回应::

订阅主题 Hello-Kafka 记录::org.apache.kafka.clients.consumer.ConsumerRecords@76b0bfab org.apache.kafka.clients.consumer.ConsumerRecords@76b0bfab

这里不打印 Offset,key,value 控件不会为(ConsumerRecord 记录:记录){ for loop it self 请帮助我。

java apache-kafka kafka-consumer-api
1个回答
1
投票

您正在尝试打印空记录,因此只有 records.toString() 在您的代码中打印,这实际上是类的名称。
我对您的代码进行了一些更改并使其正常运行。看看这是否有帮助。

public class ConsumerGroup {
    public static void main(String[] args) throws Exception {

        String topic = "Hello-Kafka";
        String group = "myGroup";
        Properties props = new Properties();
        props.put("bootstrap.servers", "xx.xx.xx.xx:9092");
        props.put("group.id", group);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        try {

            consumer.subscribe(Arrays.asList(topic));
            System.out.println("Subscribed to topic " + topic);

            while(true){
                ConsumerRecords<String, String> records = consumer.poll(1000);
                if(records.isEmpty()){

                }
                else{
                System.out.println("records ::" + records);
                System.out.println(records.toString());
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Record::" + record.offset());
                    System.out.println(record.key());
                    System.out.println(record.value());
                }
                consumer.commitSync();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.commitSync();
            consumer.close();
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.