我正在使用Flink处理来自某些数据源(例如Kafka,Pravega等)的数据。
就我而言,数据源是Pravega,它为我提供了flink连接器。
我的数据源向我发送了一些JSON数据,如下所示:
{"key": "value"}
{"key": "value2"}
{"key": "value3"}
...
...
这是我的代码:
PravegaDeserializationSchema<ObjectNode> adapter = new PravegaDeserializationSchema<>(ObjectNode.class, new JavaSerializer<>());
FlinkPravegaReader<ObjectNode> source = FlinkPravegaReader.<ObjectNode>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withDeserializationSchema(adapter)
.build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> dataStream = env.addSource(source).name("Pravega Stream");
dataStream.map(new MapFunction<ObjectNode, String>() {
@Override
public String map(ObjectNode node) throws Exception {
return node.toString();
}
})
.keyBy("word") // ERROR
.timeWindow(Time.seconds(10))
.sum("count");
如您所见,我使用FlinkPravegaReader
和适当的解串器来获取来自Pravega的JSON流。
然后我尝试将JSON数据转换为字符串,KeyBy
并对它们进行计数。
但是,我得到一个错误:
The program finished with the following exception:
Field expression must be equal to '*' or '_' for non-composite types.
org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:342)
org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:340)
myflink.StreamingJob.main(StreamingJob.java:114)
似乎KeyBy
抛出了此异常。
嗯,我不是Flink专家,所以我不知道为什么。我已经阅读了官方示例WordCount
的源代码。在该示例中,有一个托管分割器,用于将String数据分割为单词。
所以我在考虑是否在这种情况下也需要使用某种分离器?如果是这样,我应该使用哪种分离器?能给我举个例子吗?如果没有,为什么我会得到这样的错误以及如何解决?
我想您已经阅读了有关如何指定键的文档
示例代码使用keyby("word")
,因为word
是POJO类型WC
的字段。
// some ordinary POJO (Plain old Java Object)
public class WC {
public String word;
public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
在您的情况下,您将map
运算符放在keyBy
之前,并且此map
运算符的输出是string
。因此,在您的情况下,显然没有word
字段。如果您确实想对这个string
流进行分组,则需要像这样.keyBy(String::toString)
或者您甚至可以实现自定义的keySelector
以生成自己的key
。