Flink:联合多个kafka源并将它们合并在一起时哪种水印策略合适?

问题描述 投票:0回答:1

我是flink的新手。我有五个具有不同数据模式的无限卡夫卡源。我想减少消息,然后使用相同的密钥外部连接所有 kafka 源。 所以我使用 union 将它们组合在一起,然后使用 ProcessWindowFunction 将它们转换为一个大对象。然后发送到下游。 我有两个问题。

1.如何选择水印策略?对于我的场景的一个 kafka 源,该消息是单调的。当我测试时,当消息单调时,forMonotonousTimestamps更适合单个kafka源。但不同的kafka源是乱序的。 我可以在kafka源中使用单调的水印,然后在union中更改为forBoundedOutOfOrderness吗?是否有丢失数据的风险?

  1. 合并后,有许多来自同一kafka源的重复消息。如何减少它们?目前,我只是使用 kafka 源中相同的归约逻辑来归约它们。如果有其他更好的建议,我将不胜感激。
        class CommonObj {

            var id: Long = 0
            var entityType: String? = null
            var timestamp: Long = System.currentTimeMillis()
            val eventMetas: MutableList<EventMeta> = mutableListOf()

            var kafkaStreamValue1: KafkaStreamObj1? = null
            var kafkaStreamValue2: KafkaStreamObj2? = null
            var kafkaStreamValue3: KafkaStreamObj3? = null
            var kafkaStreamValue4: KafkaStreamObj4? = null

            fun buildSinkObj(): SinkObj = ....
        }

这是kafka源代码之一。其他kafka源码逻辑非常相似。

        val watermarkStrategy = WatermarkStrategy.forMonotonousTimestamps<KafkaStreamObj1>()
            .withIdleness(Duration.ofMinutes(1))

        val sourceStream1 = env.fromSource(
            getKafkaStream1(params),
            watermarkStrategy,
            "Kafka Source 1"
        )

        val kafkaSource1 = sourceStream1
            .filter { it != null }
            .map {
                EventObj<KafkaStreamObj1>(
                    it.id.toString() + it.entity, //this is key
                    it,                           //obj
                    it.sequence,                  //timestamp
                    mutableListOf(EventMeta(it.transactionId, it.type, it.sequence, it.changed))
                )
            }
            .returns(TypeInformation.of(object : TypeHint<EventObj<KafkaStreamObj1>>() {}))
            .keyBy {
                it.key }
            .window(TumblingEventTimeWindows.of(Time.milliseconds(10000)))
            .reduce { v1, v2 ->
                if (v1.obj.sequence > v2.obj.sequence) {
                    v1.eventMetaList.addAll(v2.eventMetaList)
                    v1
                } else {
                    v2.eventMetaList.addAll(v1.eventMetaList)
                    v2
                }
            }
            .map {
                val commonObj = CommonObj()
                commonObj.id = it.obj.id
                commonObj.entityType = it.obj.entity
                commonObj.timestamp = System.currentTimeMillis()
                commonObj.eventMetas.addAll(it.eventMetaList)
                commonObj.kafkaStreamValue1 = it.obj.entity
                commonObj
            }
            .returns(TypeInformation.of(object : TypeHint<CommonObj>() {}))
        return kafkaSource1

这个工会代码

        val timestampAssigner : SerializableTimestampAssigner<CommonObj> = object :
            SerializableTimestampAssigner<CommonObj> {
            override fun extractTimestamp(element: CommonObj, recordTimestamp: Long): Long {
                return element.timestamp
            }
        }

        kafkaStream1.union(kafkaStream2)
            .union(kafkaStream3)
            .union(kafkaStream4)
            .union(kafkaStream5)
            .assignTimestampsAndWatermarks(
                WatermarkStrategy.forBoundedOutOfOrderness<CommonObj?>(Duration.ofSeconds(30))
                .withTimestampAssigner(timestampAssigner)
                .withIdleness(Duration.ofMinutes(1))
            )
            .keyBy(keySelector)
            .window(TumblingEventTimeWindows.of(Time.milliseconds(10000)))
            .process(EventProcessFunction(params))
            .sinkTo(kafkaSink())

EventProcessFunction 会将所有 kafka 源消息合并到 CommonObj。它将减少密钥和时间戳的重复。

我删除了一些敏感信息。

apache-flink flink-streaming
1个回答
0
投票

要回答有关水印的主要问题,您应该通过(较新的)

StreamExecutionEnvironment.fromSource()
方法对每个 Kafka 源应用不同的水印策略,然后组合流。

© www.soinside.com 2019 - 2024. All rights reserved.