从 Kafka Consumer 传递数据

问题描述 投票:0回答:1

我想从Kafka获取数据,该方法成功获取记录但无法传递给变量。这是我的代码

public void subscribeFromKafka() throws Exception {

    List<String> result = new ArrayList<>();
    
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVERS);
    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
    props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.setProperty("auto.commit.interval.ms", "1000");
    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    
    try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
        System.out.println("Receiving from Kafka . . .");
        consumer.subscribe(Arrays.asList(KAFKA_TOPIC_URL));
        // Set a timeout for the loop, or you can use another mechanism to break out of the loop.
        long endTimeMillis = System.currentTimeMillis() + TIMEOUT_MILLIS;
    
        while (System.currentTimeMillis() < endTimeMillis) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received data: " + record.value());
                result.add(record.value());
            }
        }

        System.out.println(result + " this is the result from Kafka.");
    } catch (Exception e) {
        throw new Exception("Failed to subscribe from Kafka", e);
    }

}

如何解决?

我尝试将来自kafka消费者的记录添加到字符串的数组列表中,但记录无法添加到数组列表中

java spring-boot spring-kafka kafka-consumer-api
1个回答
0
投票

看起来您的代码正确订阅了 Kafka 主题、接收记录并将值添加到 result 列表中。如果您遇到无法在 try 块之外访问结果的问题,可能是由于 result 变量的范围所致。

在您的代码中,您已在 subscribeFromKafka 方法中声明了 result 列表。这意味着result列表只能在该方法的范围内访问。如果您想在方法之外使用 result 列表,您应该将其声明为类级别或实例变量。

© www.soinside.com 2019 - 2024. All rights reserved.