我是flink的新手。我有五个具有不同数据模式的无限卡夫卡源。我想减少消息,然后使用相同的密钥外部连接所有 kafka 源。 所以我使用 union 将它们组合在一起,然后使用 ProcessWindowFunction 将它们转换为一个大对象。然后发送到下游。 我有两个问题。
1.如何选择水印策略?对于我的场景的一个 kafka 源,该消息是单调的。当我测试时,当消息单调时,forMonotonousTimestamps更适合单个kafka源。但不同的kafka源是乱序的。 我可以在kafka源中使用单调的水印,然后在union中更改为forBoundedOutOfOrderness吗?是否有丢失数据的风险?
class CommonObj {
var id: Long = 0
var entityType: String? = null
var timestamp: Long = System.currentTimeMillis()
val eventMetas: MutableList<EventMeta> = mutableListOf()
var kafkaStreamValue1: KafkaStreamObj1? = null
var kafkaStreamValue2: KafkaStreamObj2? = null
var kafkaStreamValue3: KafkaStreamObj3? = null
var kafkaStreamValue4: KafkaStreamObj4? = null
fun buildSinkObj(): SinkObj = ....
}
这是kafka源代码之一。其他kafka源码逻辑非常相似。
val watermarkStrategy = WatermarkStrategy.forMonotonousTimestamps<KafkaStreamObj1>()
.withIdleness(Duration.ofMinutes(1))
val sourceStream1 = env.fromSource(
getKafkaStream1(params),
watermarkStrategy,
"Kafka Source 1"
)
val kafkaSource1 = sourceStream1
.filter { it != null }
.map {
EventObj<KafkaStreamObj1>(
it.id.toString() + it.entity, //this is key
it, //obj
it.sequence, //timestamp
mutableListOf(EventMeta(it.transactionId, it.type, it.sequence, it.changed))
)
}
.returns(TypeInformation.of(object : TypeHint<EventObj<KafkaStreamObj1>>() {}))
.keyBy {
it.key }
.window(TumblingEventTimeWindows.of(Time.milliseconds(10000)))
.reduce { v1, v2 ->
if (v1.obj.sequence > v2.obj.sequence) {
v1.eventMetaList.addAll(v2.eventMetaList)
v1
} else {
v2.eventMetaList.addAll(v1.eventMetaList)
v2
}
}
.map {
val commonObj = CommonObj()
commonObj.id = it.obj.id
commonObj.entityType = it.obj.entity
commonObj.timestamp = System.currentTimeMillis()
commonObj.eventMetas.addAll(it.eventMetaList)
commonObj.kafkaStreamValue1 = it.obj.entity
commonObj
}
.returns(TypeInformation.of(object : TypeHint<CommonObj>() {}))
return kafkaSource1
这个工会代码
val timestampAssigner : SerializableTimestampAssigner<CommonObj> = object :
SerializableTimestampAssigner<CommonObj> {
override fun extractTimestamp(element: CommonObj, recordTimestamp: Long): Long {
return element.timestamp
}
}
kafkaStream1.union(kafkaStream2)
.union(kafkaStream3)
.union(kafkaStream4)
.union(kafkaStream5)
.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness<CommonObj?>(Duration.ofSeconds(30))
.withTimestampAssigner(timestampAssigner)
.withIdleness(Duration.ofMinutes(1))
)
.keyBy(keySelector)
.window(TumblingEventTimeWindows.of(Time.milliseconds(10000)))
.process(EventProcessFunction(params))
.sinkTo(kafkaSink())
EventProcessFunction 会将所有 kafka 源消息合并到 CommonObj。它将减少密钥和时间戳的重复。
我删除了一些敏感信息。
要回答有关水印的主要问题,您应该通过(较新的)
StreamExecutionEnvironment.fromSource()
方法对每个 Kafka 源应用不同的水印策略,然后组合流。