我有一个要求,只有当数据具有相同的密钥时,我才希望将数据发送到同一分区。 例如:
{“字段1”:33,“字段2”:44,“字段3”:55,“唯一字段”:1}
{“字段1”:34,“字段2”:45,“字段3”:56,“唯一字段”:1}
{“字段1”:35,“字段2”:46,“字段3”:57,“唯一字段”:1}
{“字段1”:33,“字段2”:44,“字段3”:55,“唯一字段”:2}
{“字段1”:33,“字段2”:44,“字段3”:55,“唯一字段”:2}
{“字段1”:33,“字段2”:44,“字段3”:55,“唯一字段”:3}
所以,我想要 uniqueField:1 的数据转到同一分区,uniqueField:2 的数据转到同一分区,uniqueField:3 的数据转到同一分区。
因此,最终如果 uniqueField 相同,则数据的分区也必须相同,这意味着相同的 uniqueField 数据必须是相同的分区。
到目前为止,我正在使用 KafkaSink 向 Kafka 发送数据,如下所示:
def getFlinkKafkaSink(jobConf: JobConfig, kafkaInfo: KafkaInfo): KafkaSink[String] = {
val properties: Properties = getProperties()
KafkaSink
.builder[String]
.setBootstrapServers(kafkaInfo.bootstrapServers)
.setRecordSerializer(
KafkaRecordSerializationSchema
.builder()
.setTopic(kafkaInfo.topics)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setKafkaProducerConfig(properties)
.build()
}
main方法正在调用:
dataStream
.sinkTo(FlinkExecutionEnviron.getFlinkKafkaSink(jobConf, kafkaInfo))
.setParallelism(jobConf.FlinkProperties.producerParallelism)
.name("Sink to Kafka Topic")
dataStream 基本上是 DataStream[String]
从最初的试验中,我知道我需要以某种方式实现 KafkaRecordSerializationSchema 但无法实现。我需要在 Scala 中执行此操作。
如果只是想让同一个
uniqueField
字段的数据发送到同一个kafka分区,只需要使用keyBy
函数对sink之前的uniqueField
进行分区即可。
如果你还有其他需求,或者使用keyBy函数不方便,可以尝试使用
setPartitioner
中的KafkaRecordSerializationSchema
方法。该方法接收一个FlinkKafkaPartitioner,可以实现kafka消息的自定义分区。
//assume your data type is Map[String, Int]
.setPartitioner(new FlinkKafkaPartitioner[Map[String, Int]]() {
override def partition(record: Map[String, Int], key: Array[Byte], value: Array[Byte], targetTopic: String, partitions: Array[Int]): Int = {
//This returns the partition number. This is a simple example. Suppose your topic has 30 partitions.
record("uniqueField") % 30
}
})