Storm 1.2.2和Kafka Version 2.x.

问题描述 投票:0回答:1

我正在测试使用Storm 1.2.2和Kafka 2.x作为我的Spout的案例。所以我创建了一个LocalCluster,仅用于测试目的。

  TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("MYKAFKAIP:9092", "storm-test-dpi").build()), 1);
        builder.setBolt("bolt", new LoggerBolt()).shuffleGrouping("kafka_spout");

        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("kafkaBoltTest", new Config(), builder.createTopology());
        Utils.sleep(10000);

初始化此应用程序后,我得到以下内容:

9293 [Thread-20-kafka_spout-executor[3 3]] INFO  o.a.k.c.u.AppInfoParser - Kafka version : 0.10.1.0
9293 [Thread-20-kafka_spout-executor[3 3]] INFO  o.a.k.c.u.AppInfoParser - Kafka commitId : 3402a74efb23d1d4

经过很多错误之后:

9664 [Thread-20-kafka_spout-executor[3 3]] INFO  o.a.s.k.s.KafkaSpout - Initialization complete
9703 [Thread-20-kafka_spout-executor[3 3]] WARN  o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9714 [Thread-20-kafka_spout-executor[3 3]] WARN  o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9742 [Thread-20-kafka_spout-executor[3 3]] WARN  o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9756 [Thread-20-kafka_spout-executor[3 3]] WARN  o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9767 [Thread-20-kafka_spout-executor[3 3]] WARN  o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9781 [Thread-20-kafka_spout-executor[3 3]] WARN  o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9806 [Thread-20-kafka_spout-executor[3 3]] WARN  o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0

我认为这个问题是因为Kafka版本,因为你可以看到日志显示版本“0.10.1.0”,但我的Kafka版本是“2.x”。

这是我的pom.xml:

 <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${version.storm}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>${version.storm}</version>
        </dependency>

其中${version.storm}是1.2.2

apache-storm
1个回答
1
投票

您还应该声明您正在使用的kafka-clients版本。 storm-kafka-client POM将kafka-clients范围设置为provided。这意味着在构建时不会包含kafka-clients。我们这样做,您可以轻松升级。

它甚至为你运行的原因是因为你在一些测试代码中使用LocalCluster,其中存在provided依赖项。

将其添加到您的POM,它应该工作:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>your-kafka-version-here</version>
        </dependency>
© www.soinside.com 2019 - 2024. All rights reserved.