在我的库类中,我正在创建一个Kafka消费者,我的配置类如下:
@Service
public class ConsumerFactory {
private TopicNameProvider topicNameProvider;
private final static boolean ENABLE_AUTO_COMMIT = false;
private final static int MAX_POLL_RECORDS = 10;
private final static int MAX_POLL_INTERVAL_MS = 600000; // 10 min
public ConsumerFactory(@Autowired(required = false) TopicNameProvider topicNameProvider) {
this.topicNameProvider = topicNameProvider;
}
protected Properties fromKafkaProperties(KafkaConsumerProperties kafkaConsumerProperties){
Properties props = new Properties();
props.put("bootstrap.servers", kafkaConsumerProperties.getBootstrapServers());
props.put("group.id", kafkaConsumerProperties.getGroupId());
props.put("key.deserializer", ByteArrayDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", String.valueOf(ENABLE_AUTO_COMMIT));
props.put("max.poll.records", String.valueOf(MAX_POLL_RECORDS));
props.put("max.poll.interval.ms", String.valueOf(MAX_POLL_INTERVAL_MS));
return props;
}
@Bean
public KafkaConsumer consumer(KafkaConsumerProperties kafkaConsumerProperties) {
KafkaConsumer kafkaConsumer = new KafkaConsumer(fromKafkaProperties(kafkaConsumerProperties));
kafkaConsumer.subscribe(topicNamesToSubscribe(kafkaConsumerProperties));
return kafkaConsumer;
}
protected List<String> topicNamesToSubscribe(KafkaConsumerProperties kafkaConsumerProperties) {
if (topicNameProvider != null) {
return topicNameProvider.topicNames();
} else if (kafkaConsumerProperties.getTopics() != null) {
return Arrays.asList(kafkaConsumerProperties.getTopics());
} else {
throw new RuntimeException("No topic definition found to subscribe. Define topics either in application.yml " +
"in kafka.topics or implement a TopicsNameProvider bean.");
}
}
}
在我的项目中,我创建的主题名称如下:
@Service
@ConditionalOnProperty(name = "messaging.mode", havingValue = "behaviour")
public class UserBehaviourTopicNameProvider implements TopicNameProvider {
@Override
public List<String> topicNames() {
return List.of("user_behaviour_EMAIL_" + new Date().getHours());
}
}
我的目标是更改消费者每小时连接的主题名称。例如,上午 9 点,它应该连接到名为 user_behaviour_EMAIL_9 的主题,上午 10 点,它应该连接到 user_behaviour_EMAIL_10,依此类推。实现此功能的正确方法是什么?
您将在此处创建一个包含一个字符串的列表,并且主题名称仅在侦听器启动时被评估一次。因此,您需要在每小时结束时使用一些逻辑来停止并重新创建您的侦听器。
否则,您也可以使用正则表达式模式进行消费,并立即获取一整天的数据。
请记住,Kafka 对主题数量有限制,因此,例如,具有 24 个分区的主题可能是更好的模式,其中生产者实际上可以根据时间戳定义分区