运行时订阅多个kafka主题

问题描述 投票:0回答:1

我想用 Java Quarkus 创建一个微服务来在运行时订阅多个主题。该服务将定期(每天一次)从 API 读取主题并更新主题以使用。

我的问题是创建 Kafka 动态订阅者。我试过

quarkus.kafka-streams

@ApplicationScoped
public class MyTopology {

    List<String> topics = List.of("topicA", "topicB");
    @Produces
    public Topology buildTopology() {

        StreamsBuilder builder = new StreamsBuilder();

        for(String topic : topics){
            builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
                .mapValues(v -> {
                    System.out.println("Topic message elaboration" + v);
                    return v;
                });
        }
        return builder.build();

    }
}

org.apache.kafka.clients.consumer.KafkaConsumer

public class CustomKafkaConsumer implements ConsumerRebalanceListener {

    List<String> topics = List.of("topicA", "topicB");
    private Consumer<String, String> consumer;

    public void start() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "xx");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "xx");
        props.put("sasl.jaas.config", "22");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        this.consumer = new KafkaConsumer<>(props);
        this.consumer.subscribe(topics, this);
    }

    public void stop() {
        this.consumer.close();
    }

    public void poll(Duration timeout) {
        this.consumer.poll(timeout).forEach(record -> {
            // Process the message
            System.out.println("TOPIC" + record.topic());
            System.out.println("VALUE" + record.value());
            System.out.println("KEY" + record.key());
        });
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // Do nothing
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Do nothing
    }
}

找不到解决方案...我在哪里可以找到文档?或者我哪里错了?

java apache-kafka apache-kafka-streams
1个回答
0
投票

这不是真正的 Quarkus 问题。

SteamsBuilder.stream
不应在循环中使用,因为您实际上是在告诉它仅使用您在实际构建和运行拓扑时迭代过的最后一个主题。

直接将集合传递给函数

我在哪里可以找到文档?

Javadoc?


您还应该使用 peek 或 foreach 来打印数据,而不是没有变化的 mapValues


创建 Kafka 动态订阅者。

静态列表不是“动态的”。您将需要额外的代码来停止和修改拓扑使用的主题。更具体地说,从主题列表中添加/删除不会导致它们被自动消耗

© www.soinside.com 2019 - 2024. All rights reserved.