从传入的数据流中解析json以在Flink中执行简单的转换

问题描述 投票:0回答:2

我第一次使用Apache Flink和AWS Kineses。基本上我的目标是转换来自Kinesis流的传入数据,以便我可以执行简单的转换,例如过滤和聚合。

我使用以下内容添加源代码:

return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));

最后,当我打印传入的流时,我按预期获得了json数据:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = createSourceFromStaticConfig(env);
input.print();

这是打印的示例结果:

{“event_num”:“5530”,“timestmap”:“2019-03-04 14:29:44.882376”,“金额”:“80.4”,“类型”:“购买”} {“event_num”:“5531” ,“timestmap”:“2019-03-04 14:29:44.881379”,“金额”:“11.98”,“类型”:“服务”}

有人可以告诉我如何以这样的方式访问这些json元素,我可以执行简单的转换,例如,只选择包含“Service”作为类型的记录吗?

json amazon-web-services apache-flink amazon-kinesis data-stream
2个回答
1
投票

当你使用SimpleStringSchema时,产生的事件流是String类型。因此,您需要先解析字符串,然后才能应用过滤器等。

你可能想看看qazxsw poi,它会产生qazxsw poi。

© www.soinside.com 2019 - 2024. All rights reserved.