我正在创建一个端到端的 kafka 消息传递示例。 当我来到从生产者那里将用户数据填充到 kafka 主题的步骤时,并在使用以下设置我的模式注册表之后:
{
"type": "record",
"name": "ConnectDefault",
"namespace": "io.confluent.connect.avro",
"fields": [
{
"name": "id",
"type": "long"
},
{
"name": "email_address",
"type": "string"
},
{
"name": "first_name",
"type": "string"
},
{
"name": "last_name",
"type": "string"
}
]
}
我的制作人记录是:
Producer<String, User> producer = new KafkaProducer<String, User>(properties);
String topic = "inbox_products";
User user = new User();
user.setFirstName("fname");
user.setEmailAddress("email");
user.setLastName("lname");
ProducerRecord<String, User> producerRecord = new ProducerRecord<String,User>(topic,String.valueOf("key1"),user);
DB 中没有显示新记录。 但是我可以从 failed_records 主题中看到我试图发送到数据库的行是这样的:
{
"id": 0,
"firstName": "topic",
"lastName": "topic",
"emailAddress": "topic"
}
这意味着它正在向 db 发送一个空的主键,它最初应该在表 users 本身中处理。
这是我的接收器连接器配置:
{
"name": "postgres-sink-users",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://localhosr:5432/dbname",
"connection.user": "dd",
"connection.password": "dd",
"topics":"users",
"insert.mode":"insert",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schema.registry.url":"kafkaregistry:8085",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"kafkaregistry:8085",
"key.converter.schemas.enable":"true",
"value.converter.schemas.enable":"true",
"quote.sql.identifiers":"never",
"errors.tolerance":"all",
"errors.deadletterqueue.topic.name":"failed_records",
"errors.deadletterqueue.topic.replication.factor":"1",
"errors.log.enable":"true",
"pk.mode":"record_value",
"pk.fields": "id"
}
}
我应该从配置中删除“pk.mode”:“record_value”,因为自动增量是在数据库级别,还是从我的注册表模式中删除 id 字段?