我在用:
我使用以下非常简单/基本的应用程序
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:33334");
properties.setProperty("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");
properties.setProperty("group.id", "test");
String topic = "mytopic";
FlinkKafkaConsumer09<String> fkc =
new FlinkKafkaConsumer09<String>(topic, new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(fkc);
env.execute()
使用maven编译后,当我尝试使用以下命令运行时:
bin/flink run -c com.mycompany.app.App fkaf/target/fkaf-1.0-SNAPSHOT.jar
我看到以下运行时错误:
Submitting job with JobID: f6e290ec7c28f66d527eaa5286c00f4d. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://[email protected]:6123/user/jobmanager#-1679485245]
10/12/2016 15:10:06 Job execution switched to status RUNNING.
10/12/2016 15:10:06 Source: Custom Source(1/1) switched to SCHEDULED
10/12/2016 15:10:06 Source: Custom Source(1/1) switched to DEPLOYING
10/12/2016 15:10:06 Map -> Sink: Unnamed(1/1) switched to SCHEDULED
10/12/2016 15:10:06 Map -> Sink: Unnamed(1/1) switched to DEPLOYING
10/12/2016 15:10:06 Source: Custom Source(1/1) switched to RUNNING
10/12/2016 15:10:06 Map -> Sink: Unnamed(1/1) switched to RUNNING
10/12/2016 15:10:06 Map -> Sink: Unnamed(1/1) switched to CANCELED
10/12/2016 15:10:06 Source: Custom Source(1/1) switched to FAILED
java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.open(FlinkKafkaConsumer09.java:282)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:722)
有没有找到方法assign()的原因?该方法位于lib / kafka-clients-0.10.0.1.jar中。
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<String>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
// print() will write the contents of the stream to the TaskManager's standard out stream
// the rebelance call is causing a repartitioning of the data so that all machines
// see the messages (for example in cases when "num kafka partitions" < "num flink operators"
messageStream.rebalance().map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Kafka and Flink says: " + value;
}
}).print();
env.execute();
NoSuchMethodError
表示版本不匹配。
我想问题是你试图将Kafka 0.9消费者连接到Kafka 0.10实例。 Flink 1.1.x不提供Kafka 0.10消费者。但是,即将发布的1.2.0版本中将包含0.10的消费者。
您可以尝试从当前主分支(1.2-SNAPSHOT)自己构建Kafka 0.10使用者,并将其与Flink 1.1.2一起使用。相应的Flink API应该是稳定的,并且从1.2到1.1可以向后兼容。