使用自定义单消息转换器运行 debezum 源连接器

问题描述 投票:0回答:0

正在开发将 smt(单消息转换)附加到 debezium mysql 源连接器。我参考文档实现了debeziumsmt,将smt和mysql源连接器jar文件放入kafka connect的docker镜像中,然后设置插件路径。调用连接器注册时,出现以下错误。

{
  "error_code": 400,
  "message": "Connector configuration is invalid and contains the following 2 error(s):\nInvalid value com.github.gunbos.kafka.connect.smt.CustomTransformer for configuration transforms.router.type: Class com.github.gunbos.kafka.connect.smt.CustomTransformer could not be found.\nInvalid value null for configuration transforms.router.type: Not a Transformation\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"
}

排除转换选项,mysql源连接器运行良好。你能告诉我有什么问题吗?我已经为此苦苦挣扎了好几天了。

这是我的 docker 和 sql 连接器选项

FROM confluentinc/cp-kafka-connect-base:7.2.6

COPY debezium-connector-mysql/ /opt/custom-connector/debezium-connector-mysql
COPY transformer/ /opt/custom-transformer/custom-event-router
version: '2'
services:
  zookeeper:
    container_name: zookeeper
    image: confluentinc/cp-zookeeper:7.2.6
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    container_name: kafka
    image: confluentinc/cp-kafka:7.2.6
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  kafka_ui:
    image: provectuslabs/kafka-ui:latest
    depends_on:
      - kafka
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092

  mysql:
    image: "mysql:8.0.31"
    ports:
      - "3306:3306"
    env_file:
      - .env
    restart: always

  kafka_connect:
    container_name: custom-connect
    image: "my-custom-connect"
    build:
      dockerfile: ./Dockerfile
    depends_on:
      - kafka
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:29092"
      CONNECT_REST_PORT: "8083"
      CONNECT_GROUP_ID: "outbox"
      CONNECT_CONFIG_STORAGE_TOPIC: "outbox-config"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_TOPIC: "outbox-offset"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: "outbox-status"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
      CONNECT_PLUGIN_PATH: "/usr/share/java, /opt/custom-connector/, /opt/custom-transformer/"
{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "cain",
    "database.password": "cain",
    "topic.prefix": "test",
    "database.server.id": "123456",
    "database.include.list": "kafka_connect",
    "table.include.list": "kafka_connect.outbox",
    "topic.creation.default.replication.factor" : 1,
    "topic.creation.default.partitions" : 1,
    "schema.history.internal.kafka.topic": "schemaHistory.outbox",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:29092",
    "transforms": "router",
    "transforms.router.type": "com.github.gunbos.kafka.connect.smt.CustomTransformer"
  }
}

如果您让我知道,我将非常感激。 这是我的 github https://github.com/gunb0s/kafka-test

apache-kafka-connect debezium outbox-pattern
© www.soinside.com 2019 - 2024. All rights reserved.