Kafka JDBC连接器中的自定义分区分配

问题描述 投票:3回答:1

我有一个用例,我需要编写一个自定义逻辑来根据消息中的某些关键参数分配分区。我做了一些研究,发现kafka转换支持覆盖Transformation接口中的一些方法但我无法在git hub或某处做一些示例代码。有人可以分享示例代码或git hub链接在kafka JDBC源连接器中进行自定义分区分配吗?

提前致谢!。

java apache-kafka apache-kafka-connect
1个回答
2
投票

Kafka Connect默认使用分配分区:DefaultPartitionerorg.apache.kafka.clients.producer.internals.DefaultPartitioner

如果您需要使用某个自定义覆盖默认值,则可以,但您必须记住,该覆盖适用于所有源连接器。要做到这一点,你必须设置producer.partitioner.class属性,前producer.partitioner.class=com.example.CustomPartitioner。此外,您必须使用分区程序将jar复制到使用Kafka Connect库的目录。

转型方式:

在转换中也可以设置分区,但这不是正确的方法。从Transformation,您无权访问主题元数据,这对于分配分区至关重要。

如果您想为记录设置分区,代码应如下所示:

public class AddPartition <R extends ConnectRecord<R>> implements Transformation<R> {

    public static final ConfigDef CONFIG_DEF = new ConfigDef();

    @Override
    public void configure(Map<String, ?> props) {
        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
    }

    @Override
    public R apply(R record) {
        return record.newRecord(record.topic(), calculatePartition(record), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp());
    }

    private Integer calculatePartition(R record) {
        // Partitions calcuation based on record information
        return 0;
    }

    @Override
    public void close() {
    }

    @Override
    public ConfigDef config() {
        return CONFIG_DEF;
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.