正在开发将 smt(单消息转换)附加到 debezium mysql 源连接器。我参考文档实现了debeziumsmt,将smt和mysql源连接器jar文件放入kafka connect的docker镜像中,然后设置插件路径。调用连接器注册时,出现以下错误。
{
"error_code": 400,
"message": "Connector configuration is invalid and contains the following 2 error(s):\nInvalid value com.github.gunbos.kafka.connect.smt.CustomTransformer for configuration transforms.router.type: Class com.github.gunbos.kafka.connect.smt.CustomTransformer could not be found.\nInvalid value null for configuration transforms.router.type: Not a Transformation\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"
}
排除转换选项,mysql源连接器运行良好。你能告诉我有什么问题吗?我已经为此苦苦挣扎了好几天了。
这是我的 docker 和 sql 连接器选项
FROM confluentinc/cp-kafka-connect-base:7.2.6
COPY debezium-connector-mysql/ /opt/custom-connector/debezium-connector-mysql
COPY transformer/ /opt/custom-transformer/custom-event-router
version: '2'
services:
zookeeper:
container_name: zookeeper
image: confluentinc/cp-zookeeper:7.2.6
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
container_name: kafka
image: confluentinc/cp-kafka:7.2.6
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafka_ui:
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
mysql:
image: "mysql:8.0.31"
ports:
- "3306:3306"
env_file:
- .env
restart: always
kafka_connect:
container_name: custom-connect
image: "my-custom-connect"
build:
dockerfile: ./Dockerfile
depends_on:
- kafka
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:29092"
CONNECT_REST_PORT: "8083"
CONNECT_GROUP_ID: "outbox"
CONNECT_CONFIG_STORAGE_TOPIC: "outbox-config"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_TOPIC: "outbox-offset"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: "outbox-status"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
CONNECT_PLUGIN_PATH: "/usr/share/java, /opt/custom-connector/, /opt/custom-transformer/"
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "cain",
"database.password": "cain",
"topic.prefix": "test",
"database.server.id": "123456",
"database.include.list": "kafka_connect",
"table.include.list": "kafka_connect.outbox",
"topic.creation.default.replication.factor" : 1,
"topic.creation.default.partitions" : 1,
"schema.history.internal.kafka.topic": "schemaHistory.outbox",
"schema.history.internal.kafka.bootstrap.servers": "kafka:29092",
"transforms": "router",
"transforms.router.type": "com.github.gunbos.kafka.connect.smt.CustomTransformer"
}
}
如果您让我知道,我将非常感激。 这是我的 github https://github.com/gunb0s/kafka-test