我是flink的新手。我有五个具有不同数据模式的无限卡夫卡源。我想减少消息并获取最新消息,然后使用相同的密钥外部连接所有 kafka 源。 所以我使用 union 将它们组合在一起,然后使用 ProcessWindowFunction 将它们转换为一个大对象。然后发送到下游。 但合并后总会有很多数据丢失。 我认为丢失数据是因为晚了。
class CommonObj {
var id: Long = 0
var entityType: String? = null
var timestamp: Long = System.currentTimeMillis()
val eventMetas: MutableList<EventMeta> = mutableListOf()
var kafkaStreamValue1: KafkaStreamObj1? = null
var kafkaStreamValue2: KafkaStreamObj2? = null
var kafkaStreamValue3: KafkaStreamObj3? = null
var kafkaStreamValue4: KafkaStreamObj4? = null
fun buildOutPutObjt(): OutPutObj = ....
}
这是kafka源代码之一。其他kafka源码逻辑非常相似。
val watermarkStrategy = WatermarkStrategy.forBoundedOutOfOrderness<OfferUpdated>(Duration.ofSeconds(5))
.withIdleness(Duration.ofMinutes(1))
val sourceStream1 = env.fromSource(
getKafkaStream1(params),
watermarkStrategy,
"Kafka Source 1"
)
val kafkaSource1 = sourceStream1
.filter { it != null }
.map {
EventObj<KafkaStreamObj1>(
it.id.toString() + it.entity, //this is key
it, //obj
it.sequence, //timestamp
mutableListOf(EventMeta(it.transactionId, it.type, it.sequence, it.changed))
)
}
.returns(TypeInformation.of(object : TypeHint<EventObj<KafkaStreamObj1>>() {}))
.keyBy {
it.key }
.window(TumblingEventTimeWindows.of(Time.milliseconds(30000)))
.reduce { v1, v2 ->
if (v1.obj.timestamp > v2.obj.timestamp) {
v1
} else {
v2
}
}
.map {
val commonObj = CommonObj()
commonObj.id = it.obj.id
commonObj.entityType = it.obj.entityType
commonObj.timestamp = System.currentTimeMillis()
commonObj.kafkaStreamValue1 = it.obj.entity // For other kafka stream, it will use kafkaStreamValue2 or kafkaStreamValue3
commonObj
}
.returns(TypeInformation.of(object : TypeHint<CommonObj>() {}))
这个工会代码
kafkaStream1.union(kafkaStream2,kafkaStream3,kafkaStream4,kafkaStream5)
.keyBy(keySelector)
.window(TumblingEventTimeWindows.of(Time.milliseconds(30000)))
.process(EventProcessFunction(params))
.sinkTo(kafkaSink())
此事件处理函数
class EventProcessFunction(private val params: ParameterTool) : ProcessWindowFunction<CommonObj, OutPutObj, String, TimeWindow>() {
override fun open(parameters: Configuration?) {
super.open(parameters)
//open data source
}
override fun close() {
//close data source
}
override fun process(
key: String,
context: Context,
elements: MutableIterable<CommonObj>,
out: Collector<OutPutObj>
) {
val commonObj = CommonObj()
//LTS: latest time stamp
var kafkaStreamObj1LTS: Long = Long.MIN_VALUE
var kafkaStreamObj2LTS: Long = Long.MIN_VALUE
var kafkaStreamObj3LTS: Long = Long.MIN_VALUE
var kafkaStreamObj4LTS: Long = Long.MIN_VALUE
val id = elements.first().id
elements.forEach {
commonObj.id = it.id
commonObj.entityType = elements.first().entityType
commonObj.timestamp = System.currentTimeMillis()
if (it.id != id) {
log.error { "id not equal ele id: ${it.id}, first id $id" }
}
if (it.kafkaStreamObj1 != null) {
if (commonObj.kafkaStreamObj1 != null && kafkaStreamObj1LTS > it.timestamp) {
return@forEach
}
kafkaStreamObj1LTS = it.timestamp
commonObj.kafkaStreamObj1 = it.kafkaStreamObj1
} else if (it.kafkaStreamObj2 != null) {
if (commonObj.kafkaStreamObj2 != null && kafkaStreamObj2LTS > it.timestamp) {
return@forEach
}
kafkaStreamObj2LTS = it.timestamp
commonObj.kafkaStreamObj2 = it.kafkaStreamObj2
} else if (it.kafkaStreamObj3 != null) {
if (commonObj.kafkaStreamObj3 != null && kafkaStreamObj3LTS > it.timestamp) {
return@forEach
}
kafkaStreamObj3LTS = it.timestamp
commonObj.kafkaStreamObj3 = it.kafkaStreamObj3
} else if (it.kafkaStreamObj4 != null) {
if (commonObj.kafkaStreamObj4 != null && kafkaStreamObj4LTS > it.timestamp) {
return@forEach
}
kafkaStreamObj4LTS = it.timestamp
commonObj.kafkaStreamObj4 = it.kafkaStreamObj4
}
}
if (commonObj.kafkaStreamObj1 == null && commonObj.entityType == EntityType.V.name) {
val kafkaStreamObj1Db = kafkaStreamObj1Repository.findKafkaStreamObj1(commonObj.id, commonObj.entityType!!)
commonObj.kafkaStreamObj1 = kafkaStreamObj1Db
}
if (commonObj.kafkaStreamObj2 == null) {
val kafkaStreamObj2Db = kafkaStreamObj2Repository.findKafkaStreamObj2(commonObj.id, commonObj.entityType!!)
commonObj.kafkaStreamObj2 = kafkaStreamObj2Db
}
if (commonObj.kafkaStreamObj3 == null) {
val kafkaStreamObj3Db =
kafkaStreamObj3Repository.kafkaStreamObj3Repository(commonObj.id, commonObj.entityType!!)
commonObj.kafkaStreamObj3 = kafkaStreamObj3Db
}
if (commonObj.kafkaStreamObj4 == null) {
val kafkaStreamObj4Db =
kafkaStreamObj4Repository.kafkaStreamObj4Repository(commonObj.id, commonObj.entityType!!)
commonObj.kafkaStreamObj4 = kafkaStreamObj4Db
}
val outPutObj = commonObj.buildOutPutObjt()
out.collect(outPutObj)
}
}
我删除了一些敏感信息。 为什么联合后消息可能会丢失?据我所知,联合水印将使用所有kafka源的最小值。它不应该丢失任何一个,并且可能会对一些更快的 kafka 源产生背压。
我也尝试过 TumblingProcessingTimeWindows 并且没有水印。但是当kafka topic有滞后时,就会产生很大的背压。然后检查点就会超时。即使增加检查点超时时间,检查点也会开始长时间(10-30分钟)延迟。根本原因应该是DB查询需要很长时间并且在union之后阻塞处理。 但是kafka topic没有卡顿的时候就很正常了
这里几乎没有潜在的问题,但是您共享的代码太复杂且太不完整,无法自信地分析。但有几件事看起来令人担忧:
System.currentTimeMillis()
与事件时间逻辑结合很容易导致问题kafkaStreamObj3Repository.kafkaStreamObj3Repository(commonObj.id, commonObj.entityType!!)
这样的调用正在对外部数据库进行 I/O,那么很可能在某些时候会导致问题在我看来,使用 Flink 的 Table API 加入这些流可以为您省去很多麻烦。