Flink Kafka Connector运行时错误

问题描述 投票:0回答:1

我在用:

  • 实质性1.1.2
  • 卡夫卡2.10-0.10.0.1
  • Phlinaka连接器-卡夫卡-09210100

我使用以下非常简单/基本的应用程序

Properties properties = new Properties();                               
properties.setProperty("bootstrap.servers", "localhost:33334");         

properties.setProperty("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");
properties.setProperty("group.id", "test");                             
String topic = "mytopic";                                                

FlinkKafkaConsumer09<String> fkc =                                      
    new FlinkKafkaConsumer09<String>(topic, new SimpleStringSchema(), properties);

DataStream<String> stream = env.addSource(fkc);    
env.execute()

使用maven编译后,当我尝试使用以下命令运行时:

bin/flink run -c  com.mycompany.app.App fkaf/target/fkaf-1.0-SNAPSHOT.jar

我看到以下运行时错误:

Submitting job with JobID: f6e290ec7c28f66d527eaa5286c00f4d. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://[email protected]:6123/user/jobmanager#-1679485245]
10/12/2016 15:10:06     Job execution switched to status RUNNING.
10/12/2016 15:10:06     Source: Custom Source(1/1) switched to SCHEDULED 
10/12/2016 15:10:06     Source: Custom Source(1/1) switched to DEPLOYING 
10/12/2016 15:10:06     Map -> Sink: Unnamed(1/1) switched to SCHEDULED 
10/12/2016 15:10:06     Map -> Sink: Unnamed(1/1) switched to DEPLOYING 
10/12/2016 15:10:06     Source: Custom Source(1/1) switched to RUNNING 
10/12/2016 15:10:06     Map -> Sink: Unnamed(1/1) switched to RUNNING 
10/12/2016 15:10:06     Map -> Sink: Unnamed(1/1) switched to CANCELED 
10/12/2016 15:10:06     Source: Custom Source(1/1) switched to FAILED 
java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.open(FlinkKafkaConsumer09.java:282)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
    at java.lang.Thread.run(Thread.java:722)

有没有找到方法assign()的原因?该方法位于lib / kafka-clients-0.10.0.1.jar中。

    ParameterTool parameterTool = ParameterTool.fromArgs(args);             

    DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<String>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));

    // print() will write the contents of the stream to the TaskManager's standard out stream
    // the rebelance call is causing a repartitioning of the data so that all machines
    // see the messages (for example in cases when "num kafka partitions" < "num flink operators"
    messageStream.rebalance().map(new MapFunction<String, String>() {       
        private static final long serialVersionUID = -6867736771747690202L; 

        @Override                                                           
        public String map(String value) throws Exception {                  
            return "Kafka and Flink says: " + value;                        
        }                                                                   
    }).print();                                                             

    env.execute();  
apache-kafka apache-flink
1个回答
1
投票

NoSuchMethodError表示版本不匹配。

我想问题是你试图将Kafka 0.9消费者连接到Kafka 0.10实例。 Flink 1.1.x不提供Kafka 0.10消费者。但是,即将发布的1.2.0版本中将包含0.10的消费者。

您可以尝试从当前主分支(1.2-SNAPSHOT)自己构建Kafka 0.10使用者,并将其与Flink 1.1.2一起使用。相应的Flink API应该是稳定的,并且从1.2到1.1可以向后兼容。

© www.soinside.com 2019 - 2024. All rights reserved.