我有一个添加了自定义转换器插件的 kafka 连接器,它符合:
Error getting config definition from Transformation
这是我的连接器配置:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: "${connector_name}-s3-sink-connector"
namespace: ${namespace}
labels:
strimzi.io/cluster: "${connect_cluster_name}"
spec:
autoRestart:
enabled: true
class: io.confluent.connect.s3.S3SinkConnector
tasksMax: ${num_tasks}
config:
"consumer.auto.offset.reset": "none"
"flush.size": "5000"
"format.class": "io.confluent.connect.s3.format.json.JsonFormat"
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
"locale": "de_DE"
"name": "${connector_name}-s3-sink-connector"
"partition.duration.ms": "3600000"
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner"
"path.format": "'partition_date'=YYYY-MM-dd/'hour'=HH"
"rotate.interval.ms": "${rotate_interval_ms}"
"s3.bucket.name": "${bucket_name}"
"s3.compression.type": "gzip"
"s3.region": "eu-central-1"
"s3.compression.level": 1
"s3.part.size": "5242880"
"schemas.enable": "false"
"storage.class": "io.confluent.connect.s3.storage.S3Storage"
"timezone": "UTC"
"topics": "events_v2"
"topics.dir": "public/raw"
"timestamp.extractor" : "me.mine.kafka.connect.storage.partitioner.HeaderFieldTimestampExtractor"
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
"value.converter.schemas.enable": "false"
"transforms" : "offerListExtractHeaderTransformer"
"transforms.offerListExtractHeaderTransformer.header.key.eventCreationTime" : "offerListEventTime"
"transforms.offerListExtractHeaderTransformer.type" : "me.mine.kafka.connect.transforms.ExtractHeaderTransformer$Value"
"errors.tolerance": "none"
"errors.logs.enable": "true"
"errors.log.include.messages": "true"
我试过添加反斜杠(\)来转义 $ 但那也说“无效的转义字符”。
任何想法可能是错的