Flink:联合多kafka源时为什么会丢失数据?

问题描述 投票:0回答:1

我是flink的新手。我有五个具有不同数据模式的无限卡夫卡源。我想减少消息并获取最新消息,然后使用相同的密钥外部连接所有 kafka 源。 所以我使用 union 将它们组合在一起,然后使用 ProcessWindowFunction 将它们转换为一个大对象。然后发送到下游。 但合并后总会有很多数据丢失。 我认为丢失数据是因为晚了。

        class CommonObj {

            var id: Long = 0
            var entityType: String? = null
            var timestamp: Long = System.currentTimeMillis()
            val eventMetas: MutableList<EventMeta> = mutableListOf()

            var kafkaStreamValue1: KafkaStreamObj1? = null
            var kafkaStreamValue2: KafkaStreamObj2? = null
            var kafkaStreamValue3: KafkaStreamObj3? = null
            var kafkaStreamValue4: KafkaStreamObj4? = null

            fun buildOutPutObjt(): OutPutObj = ....
        }

这是kafka源代码之一。其他kafka源码逻辑非常相似。

        val watermarkStrategy = WatermarkStrategy.forBoundedOutOfOrderness<OfferUpdated>(Duration.ofSeconds(5))
            .withIdleness(Duration.ofMinutes(1))

        val sourceStream1 = env.fromSource(
            getKafkaStream1(params),
            watermarkStrategy,
            "Kafka Source 1"
        )

        val kafkaSource1 = sourceStream1
            .filter { it != null }
            .map {
                EventObj<KafkaStreamObj1>(
                    it.id.toString() + it.entity, //this is key
                    it,                           //obj
                    it.sequence,                  //timestamp
                    mutableListOf(EventMeta(it.transactionId, it.type, it.sequence, it.changed))
                )
            }
            .returns(TypeInformation.of(object : TypeHint<EventObj<KafkaStreamObj1>>() {}))
            .keyBy {
                it.key }
            .window(TumblingEventTimeWindows.of(Time.milliseconds(30000)))
            .reduce { v1, v2 ->
                if (v1.obj.timestamp > v2.obj.timestamp) {
                    v1
                } else {
                    v2
                }
            }
            .map {
                val commonObj = CommonObj()
                commonObj.id = it.obj.id
                commonObj.entityType = it.obj.entityType
                commonObj.timestamp = System.currentTimeMillis()
                commonObj.kafkaStreamValue1 = it.obj.entity // For other kafka stream, it will use kafkaStreamValue2 or kafkaStreamValue3
                commonObj
            }
            .returns(TypeInformation.of(object : TypeHint<CommonObj>() {}))

这个工会代码

        kafkaStream1.union(kafkaStream2,kafkaStream3,kafkaStream4,kafkaStream5)
            .keyBy(keySelector)
            .window(TumblingEventTimeWindows.of(Time.milliseconds(30000)))
            .process(EventProcessFunction(params))
            .sinkTo(kafkaSink())

此事件处理函数

class EventProcessFunction(private val params: ParameterTool) : ProcessWindowFunction<CommonObj, OutPutObj, String, TimeWindow>() {
    override fun open(parameters: Configuration?) {
        super.open(parameters)
        //open data source
    }

    override fun close() {
        //close data source
    }

    override fun process(
        key: String,
        context: Context,
        elements: MutableIterable<CommonObj>,
        out: Collector<OutPutObj>
    ) {
        val commonObj = CommonObj()
        //LTS: latest time stamp
        var kafkaStreamObj1LTS: Long = Long.MIN_VALUE
        var kafkaStreamObj2LTS: Long = Long.MIN_VALUE
        var kafkaStreamObj3LTS: Long = Long.MIN_VALUE
        var kafkaStreamObj4LTS: Long = Long.MIN_VALUE
        val id = elements.first().id

        elements.forEach {
            commonObj.id = it.id
            commonObj.entityType = elements.first().entityType
            commonObj.timestamp = System.currentTimeMillis()
            if (it.id != id) {
                log.error { "id not equal ele id: ${it.id}, first id $id" }
            }

            if (it.kafkaStreamObj1 != null) {
                if (commonObj.kafkaStreamObj1 != null && kafkaStreamObj1LTS > it.timestamp) {
                    return@forEach
                }
                kafkaStreamObj1LTS = it.timestamp
                commonObj.kafkaStreamObj1 = it.kafkaStreamObj1
            } else if (it.kafkaStreamObj2 != null) {
                if (commonObj.kafkaStreamObj2 != null && kafkaStreamObj2LTS > it.timestamp) {
                    return@forEach
                }
                kafkaStreamObj2LTS = it.timestamp
                commonObj.kafkaStreamObj2 = it.kafkaStreamObj2
            } else if (it.kafkaStreamObj3 != null) {
                if (commonObj.kafkaStreamObj3 != null && kafkaStreamObj3LTS > it.timestamp) {
                    return@forEach
                }
                kafkaStreamObj3LTS = it.timestamp
                commonObj.kafkaStreamObj3 = it.kafkaStreamObj3
            } else if (it.kafkaStreamObj4 != null) {
                if (commonObj.kafkaStreamObj4 != null && kafkaStreamObj4LTS > it.timestamp) {
                    return@forEach
                }
                kafkaStreamObj4LTS = it.timestamp
                commonObj.kafkaStreamObj4 = it.kafkaStreamObj4
            }
        }

        if (commonObj.kafkaStreamObj1 == null && commonObj.entityType == EntityType.V.name) {
            val kafkaStreamObj1Db = kafkaStreamObj1Repository.findKafkaStreamObj1(commonObj.id, commonObj.entityType!!)
            commonObj.kafkaStreamObj1 = kafkaStreamObj1Db
        }
        if (commonObj.kafkaStreamObj2 == null) {
            val kafkaStreamObj2Db = kafkaStreamObj2Repository.findKafkaStreamObj2(commonObj.id, commonObj.entityType!!)
            commonObj.kafkaStreamObj2 = kafkaStreamObj2Db
        }
        if (commonObj.kafkaStreamObj3 == null) {
            val kafkaStreamObj3Db =
                kafkaStreamObj3Repository.kafkaStreamObj3Repository(commonObj.id, commonObj.entityType!!)
            commonObj.kafkaStreamObj3 = kafkaStreamObj3Db
        }
        if (commonObj.kafkaStreamObj4 == null) {
            val kafkaStreamObj4Db =
                kafkaStreamObj4Repository.kafkaStreamObj4Repository(commonObj.id, commonObj.entityType!!)
            commonObj.kafkaStreamObj4 = kafkaStreamObj4Db
        }

        val outPutObj = commonObj.buildOutPutObjt()
        out.collect(outPutObj)
    }

}

我删除了一些敏感信息。 为什么联合后消息可能会丢失?据我所知,联合水印将使用所有kafka源的最小值。它不应该丢失任何一个,并且可能会对一些更快的 kafka 源产生背压。

我也尝试过 TumblingProcessingTimeWindows 并且没有水印。但是当kafka topic有滞后时,就会产生很大的背压。然后检查点就会超时。即使增加检查点超时时间,检查点也会开始长时间(10-30分钟)延迟。根本原因应该是DB查询需要很长时间并且在union之后阻塞处理。 但是kafka topic没有卡顿的时候就很正常了

apache-flink flink-streaming
1个回答
0
投票

这里几乎没有潜在的问题,但是您共享的代码太复杂且太不完整,无法自信地分析。但有几件事看起来令人担忧:

  • System.currentTimeMillis()
    与事件时间逻辑结合很容易导致问题
  • 如果像
    kafkaStreamObj3Repository.kafkaStreamObj3Repository(commonObj.id, commonObj.entityType!!)
    这样的调用正在对外部数据库进行 I/O,那么很可能在某些时候会导致问题

在我看来,使用 Flink 的 Table API 加入这些流可以为您省去很多麻烦。

© www.soinside.com 2019 - 2024. All rights reserved.