如何在Apache Flink中拆分NodeObject的数据

问题描述 投票:0回答:1

我正在使用Flink处理来自某些数据源(例如Kafka,Pravega等)的数据。

就我而言,数据源是Pravega,它为我提供了flink连接器。

我的数据源向我发送了一些JSON数据,如下所示:

{"key": "value"}
{"key": "value2"}
{"key": "value3"}
...
...

这是我的代码:

PravegaDeserializationSchema<ObjectNode> adapter = new PravegaDeserializationSchema<>(ObjectNode.class, new JavaSerializer<>());
FlinkPravegaReader<ObjectNode> source = FlinkPravegaReader.<ObjectNode>builder()
    .withPravegaConfig(pravegaConfig)
    .forStream(stream)
    .withDeserializationSchema(adapter)
    .build();

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> dataStream = env.addSource(source).name("Pravega Stream");
dataStream.map(new MapFunction<ObjectNode, String>() {
        @Override
        public String map(ObjectNode node) throws Exception {
            return node.toString();
        }
    })
    .keyBy("word")    // ERROR
    .timeWindow(Time.seconds(10))
    .sum("count");

如您所见,我使用FlinkPravegaReader和适当的解串器来获取来自Pravega的JSON流。

然后我尝试将JSON数据转换为字符串,KeyBy并对它们进行计数。

但是,我得到一个错误:

 The program finished with the following exception:

Field expression must be equal to '*' or '_' for non-composite types.
        org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:342)
        org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:340)
        myflink.StreamingJob.main(StreamingJob.java:114)

似乎KeyBy抛出了此异常。

嗯,我不是Flink专家,所以我不知道为什么。我已经阅读了官方示例WordCount的源代码。在该示例中,有一个托管分割器,用于将String数据分割为单词。

所以我在考虑是否在这种情况下也需要使用某种分离器?如果是这样,我应该使用哪种分离器?能给我举个例子吗?如果没有,为什么我会得到这样的错误以及如何解决?

json apache-flink flink-streaming
1个回答
0
投票

我想您已经阅读了有关如何指定键的文档

Specify keys

示例代码使用keyby("word"),因为word是POJO类型WC的字段。

// some ordinary POJO (Plain old Java Object)
public class WC {
  public String word;
  public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);

在您的情况下,您将map运算符放在keyBy之前,并且此map运算符的输出是string。因此,在您的情况下,显然没有word字段。如果您确实想对这个string流进行分组,则需要像这样.keyBy(String::toString)

编写它

或者您甚至可以实现自定义的keySelector以生成自己的key

Customized Key Selector

© www.soinside.com 2019 - 2024. All rights reserved.