如何将数据发送到 Kafka,其中特定于某个键的数据仅在 Flink 流作业中使用 KafkaSink 发送到同一分区?

问题描述 投票:0回答:1

我有一个要求,只有当数据具有相同的密钥时,我才希望将数据发送到同一分区。 例如:

{“字段1”:33,“字段2”:44,“字段3”:55,“唯一字段”:1}

{“字段1”:34,“字段2”:45,“字段3”:56,“唯一字段”:1}

{“字段1”:35,“字段2”:46,“字段3”:57,“唯一字段”:1}

{“字段1”:33,“字段2”:44,“字段3”:55,“唯一字段”:2}

{“字段1”:33,“字段2”:44,“字段3”:55,“唯一字段”:2}

{“字段1”:33,“字段2”:44,“字段3”:55,“唯一字段”:3}

所以,我想要 uniqueField:1 的数据转到同一分区,uniqueField:2 的数据转到同一分区,uniqueField:3 的数据转到同一分区。

因此,最终如果 uniqueField 相同,则数据的分区也必须相同,这意味着相同的 uniqueField 数据必须是相同的分区。

到目前为止,我正在使用 KafkaSink 向 Kafka 发送数据,如下所示:


def getFlinkKafkaSink(jobConf: JobConfig, kafkaInfo: KafkaInfo): KafkaSink[String] = {

val properties: Properties = getProperties()
KafkaSink
      .builder[String]
      .setBootstrapServers(kafkaInfo.bootstrapServers)
      .setRecordSerializer(
        KafkaRecordSerializationSchema
          .builder()
          .setTopic(kafkaInfo.topics)
          .setValueSerializationSchema(new SimpleStringSchema())
          .build()
      )
      .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
      .setKafkaProducerConfig(properties)
      .build()
}

main方法正在调用:


 dataStream
          .sinkTo(FlinkExecutionEnviron.getFlinkKafkaSink(jobConf, kafkaInfo))
          .setParallelism(jobConf.FlinkProperties.producerParallelism)
          .name("Sink to Kafka Topic")

dataStream 基本上是 DataStream[String]

从最初的试验中,我知道我需要以某种方式实现 KafkaRecordSerializationSchema 但无法实现。我需要在 Scala 中执行此操作。

apache-kafka apache-flink flink-streaming
1个回答
0
投票

如果只是想让同一个

uniqueField
字段的数据发送到同一个kafka分区,只需要使用
keyBy
函数对sink之前的
uniqueField
进行分区即可。

如果你还有其他需求,或者使用keyBy函数不方便,可以尝试使用

setPartitioner
中的
KafkaRecordSerializationSchema
方法。该方法接收一个FlinkKafkaPartitioner,可以实现kafka消息的自定义分区。

//assume your data type is Map[String, Int]
.setPartitioner(new FlinkKafkaPartitioner[Map[String, Int]]() {
  override def partition(record: Map[String, Int], key: Array[Byte], value: Array[Byte], targetTopic: String, partitions: Array[Int]): Int = {
    //This returns the partition number. This is a simple example. Suppose your topic has 30 partitions.
    record("uniqueField") % 30
  }
})
最新问题
© www.soinside.com 2019 - 2024. All rights reserved.