我有一个包含很少事件类型的kafka主题。(这是given)
事件是 JSON 文档。
让我们调用事件类型:A、B、C、D、E。
我可以通过使用每个事件中的字段来判断类型。
我想要一个 Flink 作业来分别处理事件 A 和 B(使用会话窗口),C 和 D 应该转到另一种类型的窗口,并且事件 D 应该被删除。
我可以在Flink中实现这样的设计吗?
谢谢
如果是这样,您可以利用 Flink 对 Side Outputs 的支持,并将其用作将每个不同类型映射到其自己的流并单独操作这些流的方法(或将它们联合到下游等)
基本上:
这可能看起来像:
val events = streamEnv
.fromSource(KafkaSource.build(...))
.process(YourTypeSeparatorOperator())
// Example: Getting A & B events
val a = events.getSideOutput(Tags.a)
val b = events.getSideOutput(Tags.b)
// Union this stream (and act on it via windowing, etc.)
val ab = a.union(b)
// Likewise perform operations necessary for C & D types here
// Eventually merge all of these separate streams together if needed
在上面
YourTypeSeparatorOperator()
实际上会使用侧面输出,并根据事件的类型,将其输出到指定的侧面输出:
// Example OutputTag
object Tags{
val a = OutputTag("a", TypeInformation.of(YourClass::class.java))
val b = OutputTag("b", TypeInformation.of(YourClass::class.java))
val c = OutputTag("c", TypeInformation.of(YourClass::class.java))
val d = OutputTag("d", TypeInformation.of(YourClass::class.java))
}
// Usage
override fun processElement(...) {
...
when (message.type) {
"a" -> context.output(Tags.a, message)
"b" -> context.output(Tags.b, message)
"c" -> context.output(Tags.c, message)
"d" -> context.output(Tags.d, message)
}
}