如何使用flink流式传输json?

问题描述 投票:1回答:1

我实际上正在处理一个流,收到一堆字符串,需要计算所有字符串。总和是加重的,这意味着第二个记录的总和被添加到输出前一天必须是一些json文件看起来像

{
"aggregationType" : "day",
"days before" : 2,
"aggregates" : [
    {"date" : "2018-03-03",
    "sum" : 120},
  {"date" :"2018-03-04",
  "sum" : 203}
  ]
}

我创建了一个流看起来像:

val eventStream : DataStream [String] = 
eventStream
    .addSource(source)
    .keyBy("")
    .TimeWindow(Time.days(1), Time.days(1))
    .trigger(new MyTriggerFunc)
    .aggregation(new MyAggregationFunc)
    .addSink(sink)

提前谢谢你的帮助:)

json apache-flink flink-streaming flink-cep flink-sql
1个回答
1
投票

关于在Flink中使用JSON的注意事项:

使用JSONDeserializationSchema反序列化事件,这将产生ObjectNodes。您可以将ObjectNode映射到YourObject以方便使用或继续使用ObjectNode

使用ObjectNode的教程:http://www.baeldung.com/jackson-json-node-tree-model

回到你的案例,你可以像下面这样做:

val eventStream : DataStream [ObjectNode] = 
oneMinuteAgg
    .addSource(source)
    .windowAll()
    .TimeWindow(Time.minutes(1))
    .trigger(new MyTriggerFunc)
    .aggregation(new MyAggregationFunc)

将输出1分钟的聚合流

[     
      {
      "date" :2018-03-03
      "sum" : 120
      }, 
      {
      "date" :2018-03-03
      "sum" : 120
      }
]

然后将另一个运算符链接到“oneMinuteAgg”,它将1min聚合添加到1day聚合中:

[...]
oneMinuteAgg
        .windowAll()
        .TimeWindow(Time.days(1))
        .trigger(new Whatever)
        .aggregation(new YourDayAggF)

这将输出你需要的东西

{
    "aggregationType" : "day"
    "days before" : 4
    "aggregates : [{
      "date" :2018-03-03
      "sum" : 120
      }, 
      {
      "date" :2018-03-03
      "sum" : 120
      }]
}

假设您不需要键入流,我使用了windowAll()

© www.soinside.com 2019 - 2024. All rights reserved.