我第一次使用Apache Flink和AWS Kineses。基本上我的目标是转换来自Kinesis流的传入数据,以便我可以执行简单的转换,例如过滤和聚合。
我使用以下内容添加源代码:
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
最后,当我打印传入的流时,我按预期获得了json数据:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = createSourceFromStaticConfig(env);
input.print();
这是打印的示例结果:
{“event_num”:“5530”,“timestmap”:“2019-03-04 14:29:44.882376”,“金额”:“80.4”,“类型”:“购买”} {“event_num”:“5531” ,“timestmap”:“2019-03-04 14:29:44.881379”,“金额”:“11.98”,“类型”:“服务”}
有人可以告诉我如何以这样的方式访问这些json元素,我可以执行简单的转换,例如,只选择包含“Service”作为类型的记录吗?
当你使用SimpleStringSchema
时,产生的事件流是String
类型。因此,您需要先解析字符串,然后才能应用过滤器等。
你可能想看看qazxsw poi,它会产生qazxsw poi。