IllegalStateException订阅主题、分区和模式是相互排斥的。

问题描述 投票:0回答:1

需要从一个Kafka主题中获取消息,从一个特定的偏移量开始。

IllegalStateException异常的卡住原因是在 assign()

如果我不使用 assign() 那么消费者就不会执行seek,因为那是一个Lazy操作

实际目的: 需要从一个预先决定的偏移量开始迭代主题的信息,直到结束。这个预设的偏移量是在 markOffset()

static void fetchMessagesFromMarkedOffset() {
    Consumer<Long, String> consumer = ConsumerCreator.createConsumer();
    consumer.assign(set); // <---- Exception at this place
    map.forEach((k,v) -> {
        consumer.seek(k, v-3);
    });
    ConsumerRecords<Long, String> consumerRecords = consumer.poll(100);
    consumerRecords.forEach(record -> {
        System.out.println("Record Key " + record.key());
        System.out.println("Record value " + record.value());
        System.out.println("Record partition " + record.partition());
        System.out.println("Record offset " + record.offset());
    });
    consumer.close();
}

其余涉及的代码

public static Set<TopicPartition> set;
public static Map<TopicPartition, Long> map;

static void markOffset() {
    Consumer<Long, String> consumer = ConsumerCreator.createConsumer();
    consumer.poll(100);
    set = consumer.assignment();
    map = consumer.endOffsets(set);
    System.out.println("Topic  Partitions: " + set);
    System.out.println("End Offsets: " + map);
}

消费者创造

private Consumer createConsumer(String topicName) {
    final Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "capacity-service-application");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

    final Consumer consumer = new KafkaConsumer(props);
    consumer.subscribe(Collections.singletonList(topicName));
    return consumer;
}   

异常情况

Exception in thread "main" java.lang.IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusive
at org.apache.kafka.clients.consumer.internals.SubscriptionState.setSubscriptionType(SubscriptionState.java:104)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromUser(SubscriptionState.java:157)
at org.apache.kafka.clients.consumer.KafkaConsumer.assign(KafkaConsumer.java:1064)
at com.gaurav.kafka.App.fetchMessagesFromMarkedOffset(App.java:44)
at com.gaurav.kafka.App.main(App.java:30)
java apache-kafka microservices kafka-consumer-api
1个回答
1
投票

你不能混合使用 manualautomatic 分区分配.您应该使用 KafkaConsumer::subscribeKafkaConsumer::assign 但不能兼而有之。

如果在调用 KafkaConsumer::subscribe 你想转到 manual 办法你应该先打电话 KafkaConsumer::unsubscribe.

根据 https:/kafka.apache.org10javadocorgapachekafkaclientsconsumerKafkaConsumer.html。

请注意,不能将手动分区分配(即使用 assign)和通过主题订阅(即使用 subscribe)的动态分区分配混合使用。

© www.soinside.com 2019 - 2024. All rights reserved.