获取相关 ID 为 92 的元数据时出错:{myTest=UNKNOWN_TOPIC_OR_PARTITION}

问题描述 投票:0回答:5

我创建了一个示例应用程序来检查我的制作人的代码。当我在没有分区键的情况下发送数据时,我的应用程序运行良好。但是,在指定数据分区的键时,我收到错误:

[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 37 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 38 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 39 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}

对于消费者和生产者来说。我在互联网上搜索了很多,他们建议验证 kafka.acl 设置。我在 HDInsight 上使用 kafka ,但我不知道如何验证它并解决此问题。

我的集群具有以下配置:

  1. 头节点:2
  2. 工作节点:4
  3. 动物园管理员:3

我的生产者代码:

public static void produce(String brokers, String topicName) throws IOException{

    // Set properties used to configure the producer
    Properties properties = new Properties();
      // Set the brokers (bootstrap servers)
    properties.setProperty("bootstrap.servers", brokers);
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    // specify the protocol for Domain Joined clusters

    //To create an Idempotent Producer
    properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
    properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
    properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id"); 
    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    producer.initTransactions();
    // So we can generate random sentences
    Random random = new Random();
    String[] sentences = new String[] {
            "the cow jumped over the moon",
            "an apple a day keeps the doctor away",
            "four score and seven years ago",
            "snow white and the seven dwarfs",
            "i am at two with nature",
         };


    for(String sentence: sentences){
        // Send the sentence to the test topic
        try
        {
            String key=sentence.substring(0,2);
            producer.beginTransaction();
            producer.send(new ProducerRecord<String, String>(topicName,key,sentence)).get();
        }
        catch (Exception ex)
        {
          System.out.print(ex.getMessage());
            throw new IOException(ex.toString());
        }
        producer.commitTransaction();
    }
}

另外,我的主题由 3 个分区组成,复制因子=3

java apache-kafka kafka-producer-api
5个回答
4
投票

我将复制因子设置为小于分区数量,这对我有用。这对我来说听起来很奇怪,但是是的,它在之后就开始工作了。


2
投票

该错误清楚地表明您正在生成的主题(或分区)不存在。

最终,您需要描述该主题(通过 CLI

kafka-topics --describe --topic <topicName>
或其他方式)来验证这是否属实

HDInsight 上的 Kafka,我不知道如何验证它并解决此问题。

仅当您安装了集群时才会设置 ACL,但我相信您仍然可以通过

zookeper-shell
或通过 SSH 连接到 Hadoop 主服务器之一来列出 ACL。


0
投票

我在创建新主题时也遇到了同样的问题。当我描述该主题时,我可以看到领导者没有分配到主题分区。

主题:xxxxxxxxx 分区:0 领导者:无 副本:3,2,1 Isr: 主题:xxxxxxxxx 分区:1 领导者:无 副本:1,3,2 Isr:

经过一番谷歌搜索后,发现当我们控制器代理出现问题时可能会发生这种情况,因此重新启动控制器代理。

一切都按预期进行......!


0
投票

如果该主题存在,但您仍然看到此错误,则可能意味着提供的经纪人列表不正确。检查

bootstrap.servers
值,它应该指向主题所在的正确 Kafka 集群。

我看到了同样的问题,并且我有多个 Kafka 集群,并且该主题显然存在。但是,我的经纪人名单不正确。


0
投票

当主题不存在或分区不存在时,会发生此错误,在我的情况下,我的 amazon msk 集群配置将 auto.topic.cretion.enable 设置为 false,自定义配置并将其设置为 true 解决了该问题

© www.soinside.com 2019 - 2024. All rights reserved.