我必须实现一项功能,以将某个主题/分区的侦听器(重新)设置为任何给定的偏移量。因此,如果将事件提交到偏移量5,并且管理员决定将偏移量重置为2,则应重新处理事件3、4和5。
我们正在使用Spring for Kafka 2.3,我试图遵循ConsumerSeekAware上的文档,这似乎正是我想要的。
但是问题是,我们也使用了在运行时创建的主题。为此,我们使用KafkaMessageListenerContainer
到DefaultKafkaConsumerFactory
,但我不知道将registerSeekCallback放置在什么地方。
有什么方法可以实现这一目标?我在理解使用@KafkaListener
注释的类如何映射到工厂中如何创建侦听器的方式时遇到问题。
任何帮助将不胜感激。即使只是这些东西如何一起工作的解释。
这是创建KafkaMessageListenerContainer的基本方法:
public KafkaMessageListenerContainer<String, Object> createKafkaMessageListenerContainer(String topicName,
ContainerPropertiesStrategy containerPropertiesStrategy) {
MessageListener<String, String> messageListener = getMessageListener(topicName);
ConsumerFactory<String, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(getConsumerFactoryConfiguration());
KafkaMessageListenerContainer<String, Object> kafkaMessageListenerContainer = createKafkaMessageListenerContainer(topicName, messageListener, bootstrapServers, containerPropertiesStrategy, consumerFactory);
return kafkaMessageListenerContainer;
}
public MessageListener<String, String> getMessageListener(String topic) {
MessageListener<String, String> messageListener = new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> message) {
try {
consumerService.consume(topic, message.value());
} catch (IOException e) {
log.log(Level.WARNING, "Message couldn't be consumed", e);
}
}
};
return messageListener;
}
public static KafkaMessageListenerContainer<String, Object> createKafkaMessageListenerContainer(
String topicName, MessageListener<String, String> messageListener, String bootstrapServers, ContainerPropertiesStrategy containerPropertiesStrategy,
ConsumerFactory<String, Object> consumerFactory) {
ContainerProperties containerProperties = containerPropertiesStrategy.getContainerPropertiesForTopic(topicName);
containerProperties.setMessageListener(messageListener);
KafkaMessageListenerContainer<String, Object> kafkaMessageListenerContainer = new KafkaMessageListenerContainer<>(
consumerFactory, containerProperties);
kafkaMessageListenerContainer.setBeanName(topicName);
return kafkaMessageListenerContainer;
}
希望有所帮助。
我认为您可以像这样为spring kafka使用一些注释
@KafkaListener(topicPartitions =
@TopicPartition(topic = "${kafka.consumer.topic}", partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "2")}),
containerFactory = "filterKafkaListenerContainerFactory", id = "${kafka.consumer.groupId}")
public void receive(ConsumedObject event) {
log.info(String.format("Consumed message with correlationId: %s", event.getCorrelationId()));
consumerHelper.start(event);
}
或者,这是我编写的要从给定偏移量使用的一些代码,我模拟了使用者在消息上失败的情况,尽管这是使用KafkaConsumer而不是KafkaMessageListenerContainer。
private static void ConsumeFromOffset(KafkaConsumer<String, Customer> consumer, boolean flag, String topic) {
Scanner scanner = new Scanner(System.in);
System.out.print("Enter offset: ");
int offsetInput = scanner.nextInt();
while (true) {
ConsumerRecords<String, Customer> records = consumer.poll(500);
for (ConsumerRecord<String, Customer> record : records) {
Customer customer = record.value();
System.out.println(customer + " has offset ->" + record.offset());
if (record.offset() == 7 && flag) {
System.out.println("simulating consumer failing after offset 7..");
break;
}
}
consumer.commitSync();
if (flag) {
// consumer.seekToBeginning(Stream.of(new TopicPartition(topic, 0)).collect(Collectors.toList())); // consume from the beginning
consumer.seek(new TopicPartition(topic, 0), 3); // consume
flag = false;
}
}
}