我正在使用这个debezium-examples
我在我的jdbc-sink.json中添加了“topics.regex”:“CID1122。(。*)”如下
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics.regex": "CID1122.(.*)",
"connection.url": "jdbc:mysql://mysql:3306/inventory?verifyServerCertificate=false",
"connection.user": "root",
"connection.password": "debezium",
"auto.create": "true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"name": "jdbc-sink",
"insert.mode": "upsert",
"pk.fields": "id,companyId",
"pk.mode": "record_value"
}
}
卡夫卡主题列表是
CID1122.department
CID1122.designation
CID1122.employee
我正面对kafka java.lang.NullPointerException
connect_1 | 2019-01-30 06:14:47,302 INFO || Checking MySql dialect for existence of table "CID1122"."employee" [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
connect_1 | 2019-01-30 06:14:47,303 INFO || Using MySql dialect table "CID1122"."employee" absent [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
connect_1 | 2019-01-30 06:14:47,342 INFO || Checking MySql dialect for existence of table "CID1122"."employee" [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
connect_1 | 2019-01-30 06:14:47,343 INFO || Using MySql dialect table "CID1122"."employee" absent [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
connect_1 | 2019-01-30 06:14:47,344 ERROR || WorkerSinkTask{id=jdbc-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. [org.apache.kafka.connect.runtime.WorkerSinkTask]
connect_1 | java.lang.NullPointerException
connect_1 | at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:124)
connect_1 | at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:75)
connect_1 | at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:86)
connect_1 | at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:63)
connect_1 | at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
connect_1 | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
connect_1 | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
connect_1 | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1 | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect_1 | at java.lang.Thread.run(Thread.java:748)
connect_1 | 2019-01-30 06:14:47,345 ERROR || WorkerSinkTask{id=jdbc-sink-0} Task threw an uncaught and unrecoverable exception [org.apache.kafka.connect.runtime.WorkerTask]
connect_1 | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
connect_1 | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
connect_1 | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
connect_1 | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1 | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect_1 | at java.lang.Thread.run(Thread.java:748)
connect_1 | Caused by: java.lang.NullPointerException
connect_1 | at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:124)
connect_1 | at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:75)
connect_1 | at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:86)
connect_1 | at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:63)
connect_1 | at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
connect_1 | ... 10 more
connect_1 | 2019-01-30 06:14:47,345 ERROR || WorkerSinkTask{id=jdbc-sink-0} Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
任何解决方法?
你缺少table.name.format属性https://docs.confluent.io/3.1.1/connect/connect-jdbc/docs/sink_config_options.html(数据映射部分)
这是一个有效的例子:
{
"name": "test-0005",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics.regex": "CID1122.(.*)",
"connection.user": "kafka",
"table.name.format": "${topic}",
"connection.password": "kafka",
"connection.url": "jdbc:mysql://databasehost:3306/dbname",
"auto.create": "true",
"transforms": "route",
"transforms.t1.replacement": "$2",
"transforms.route.regex": "([^.]+)\\.([^.]+)",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter"
}
怎么了?
正如你所看到的,我添加了RegexRouter transformation,在下沉到MySQL之前动态提取主题名称,我使用的模式:([^。] +)\。([^。] +)匹配我们的topics.regex CID1122。[ event-name]然后我只提取了第2组(事件名称)。
最后,这个$ 2组将作为$ {topic}传递给table.name.format,然后你可以连接到你的数据库并检查你的数据。