如何从数据库中填充自动递增的主键字段而不是在 Kafka Producer 中显式填充它?

问题描述 投票:0回答:0

我正在创建一个端到端的 kafka 消息传递示例。 当我来到从生产者那里将用户数据填充到 kafka 主题的步骤时,并在使用以下设置我的模式注册表之后:

{
  "type": "record",
  "name": "ConnectDefault",
  "namespace": "io.confluent.connect.avro",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "email_address",
      "type": "string"
    },
    {
      "name": "first_name",
      "type": "string"
    },
    {
      "name": "last_name",
      "type": "string"
    }
  ]
}

我的制作人记录是:

  Producer<String, User> producer = new KafkaProducer<String, User>(properties);

        String topic = "inbox_products";
        User user = new User();
        user.setFirstName("fname");
                user.setEmailAddress("email");
        user.setLastName("lname");
        ProducerRecord<String, User> producerRecord = new ProducerRecord<String,User>(topic,String.valueOf("key1"),user);

DB 中没有显示新记录。 但是我可以从 failed_records 主题中看到我试图发送到数据库的行是这样的:

{
  "id": 0,
  "firstName": "topic",
  "lastName": "topic",
  "emailAddress": "topic"
}

这意味着它正在向 db 发送一个空的主键,它最初应该在表 users 本身中处理。

这是我的接收器连接器配置:

{
    "name": "postgres-sink-users",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:postgresql://localhosr:5432/dbname",
        "connection.user": "dd",
        "connection.password": "dd",
        "topics":"users",
        "insert.mode":"insert",
        "key.converter":"org.apache.kafka.connect.storage.StringConverter",
        "key.converter.schema.registry.url":"kafkaregistry:8085",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url":"kafkaregistry:8085",
        "key.converter.schemas.enable":"true",
        "value.converter.schemas.enable":"true",
        "quote.sql.identifiers":"never",
        "errors.tolerance":"all",
        "errors.deadletterqueue.topic.name":"failed_records",
        "errors.deadletterqueue.topic.replication.factor":"1",
        "errors.log.enable":"true",
        "pk.mode":"record_value",
        "pk.fields": "id"
    }
 }

我应该从配置中删除“pk.mode”:“record_value”,因为自动增量是在数据库级别,还是从我的注册表模式中删除 id 字段?

apache-kafka apache-kafka-connect kafka-producer-api confluent-schema-registry
© www.soinside.com 2019 - 2024. All rights reserved.