Kafka Connect自定义转换:覆盖apply()方法似乎无效,但是在键值操作中覆盖newRecord方法

问题描述 投票:0回答:1

我正在尝试按照示例中使用的模式创建自定义连接器

https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java

https://github.com/confluentinc/kafka-connect-insert-uuid/blob/master/src/main/java/com/github/cjmatta/kafka/connect/smt/InsertUuid.java

我已经重写了apply方法,以向负载中添加一个小的字符串

    public R apply(R record) {
        log.info("Transformation apply has started...");
        String newValue = record.value().toString() + " this has been transformed" ;
        log.info("Message changed to : " + newValue);
        return newRecord(record, null, newValue);
    }

这完全没有效果。输出不变。我也没有在生成的日志的任何地方看到日志信息消息。

但是,如果我重写Value类的newRecord方法,则可以看到消息正在更新。我也看到正在记录的信息消息。

        protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
            log.info("Message Value being changed to :");
            return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
 updatedSchema, "This is updated : " + updatedValue.toString(), record.timestamp());
        }

这将导致输出消息按预期更新。请注意,消息中的有效负载是字符串有效负载,并且没有任何特定的结构。

我以为这种有效载荷操纵也可以在apply()方法中发生。我也看不到在转换运行中调用此apply()方法。有效负载不受影响,消息也不会记录。

我在这里想念什么吗? apply()方法是否被错误调用或使用。任何指导表示赞赏。

注意:重写键类的newRecord方法也可以。

下面是转换的完整源代码

package com.xxxxx.yyyyy.kafka.connect.transform;

import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.transforms.HoistField;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public abstract class SampleTransform<R extends ConnectRecord<R>> implements Transformation<R> {
    private static Logger log = LoggerFactory.getLogger(SampleTransform.class);
    @Override
    public void configure(Map<String, ?> map) {

    }

    @Override
    public R apply(R record) {
        log.info("Transformation apply has started...");        //This does not work
        String newValue = record.value().toString() + " this has been transformed" ;
        log.info("Message changed to : " + newValue);
        return newRecord(record, null, "Message is fully changed");     //Output does not change
    }

    protected abstract Schema operatingSchema(R record);

    protected abstract Object operatingValue(R record);

    protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);

    public static class Key<R extends ConnectRecord<R>> extends HoistField<R> {
        @Override
        protected Schema operatingSchema(R record) {
            return record.keySchema();
        }

        @Override
        protected Object operatingValue(R record) {
            return record.key();
        }

        @Override
        protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
            log.info("Key being changed to : " + updatedValue.toString());          //This is logged
            return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
        }
    }

    public static class Value<R extends ConnectRecord<R>> extends HoistField<R> {
        @Override
        protected Schema operatingSchema(R record) {
            return record.valueSchema();
        }

        @Override
        protected Object operatingValue(R record) {
            return record.value();
        }

        @Override
        protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
            log.info("Message Value being changed to :");               // This is logged
            return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, "This is updated : " + updatedValue.toString(), record.timestamp());          //This works
        }
    }
}
apache-kafka transformation apache-kafka-connect
1个回答
0
投票

问题是,您扩展了org.apache.kafka.connect.transforms.HoistField,而不是您的Transformation-SampleTransform

© www.soinside.com 2019 - 2024. All rights reserved.