Spring Boot中Kafka Consumer的动态主题订阅

问题描述 投票:0回答:1

在我的库类中,我正在创建一个Kafka消费者,我的配置类如下:

@Service
public class ConsumerFactory {
    private TopicNameProvider topicNameProvider;

    private final static boolean ENABLE_AUTO_COMMIT = false;
    private final static int MAX_POLL_RECORDS = 10;
    private final static int MAX_POLL_INTERVAL_MS = 600000;  // 10 min

    public ConsumerFactory(@Autowired(required = false) TopicNameProvider topicNameProvider) {
        this.topicNameProvider = topicNameProvider;
    }

    protected Properties fromKafkaProperties(KafkaConsumerProperties kafkaConsumerProperties){
        Properties props = new Properties();
        props.put("bootstrap.servers", kafkaConsumerProperties.getBootstrapServers());
        props.put("group.id", kafkaConsumerProperties.getGroupId());
        props.put("key.deserializer", ByteArrayDeserializer.class.getName());
        props.put("value.deserializer", ByteArrayDeserializer.class.getName());
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", String.valueOf(ENABLE_AUTO_COMMIT));
        props.put("max.poll.records", String.valueOf(MAX_POLL_RECORDS));
        props.put("max.poll.interval.ms", String.valueOf(MAX_POLL_INTERVAL_MS));
        return props;
    }

    @Bean
    public KafkaConsumer consumer(KafkaConsumerProperties kafkaConsumerProperties) {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(fromKafkaProperties(kafkaConsumerProperties));
        kafkaConsumer.subscribe(topicNamesToSubscribe(kafkaConsumerProperties));
        return kafkaConsumer;
    }

    protected List<String> topicNamesToSubscribe(KafkaConsumerProperties kafkaConsumerProperties) {
        if (topicNameProvider != null) {
            return topicNameProvider.topicNames();
        } else if (kafkaConsumerProperties.getTopics() != null) {
            return Arrays.asList(kafkaConsumerProperties.getTopics());
        } else {
            throw new RuntimeException("No topic definition found to subscribe. Define topics either in application.yml " +
                    "in kafka.topics or implement a TopicsNameProvider bean.");
        }
    }

}

在我的项目中,我创建的主题名称如下:

@Service
@ConditionalOnProperty(name = "messaging.mode", havingValue = "behaviour")
public class UserBehaviourTopicNameProvider implements TopicNameProvider {

    @Override
    public List<String> topicNames() {
        return List.of("user_behaviour_EMAIL_" + new Date().getHours());
    }
}

我的目标是更改消费者每小时连接的主题名称。例如,上午 9 点,它应该连接到名为 user_behaviour_EMAIL_9 的主题,上午 10 点,它应该连接到 user_behaviour_EMAIL_10,依此类推。实现此功能的正确方法是什么?

java apache-kafka kafka-consumer-api
1个回答
0
投票

您将在此处创建一个包含一个字符串的列表,并且主题名称仅在侦听器启动时被评估一次。因此,您需要在每小时结束时使用一些逻辑来停止并重新创建您的侦听器。

否则,您也可以使用正则表达式模式进行消费,并立即获取一整天的数据。

请记住,Kafka 对主题数量有限制,因此,例如,具有 24 个分区的主题可能是更好的模式,其中生产者实际上可以根据时间戳定义分区

© www.soinside.com 2019 - 2024. All rights reserved.