Kafka:无法使用 MySQL 连接器 Kafka 中的 where 子句运行查询

问题描述 投票:0回答:2

JSON 文件:

{
  "name": "mysql-jdbc",
  "config": {
    "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url"  : "jdbc:mysql://mysqldb:3306/silicon",
    "connection.user" : "root",
    "connection.password" : "root",
    "mode"            : "incrementing",
    "incrementing.column.name": "id",
    "query": "select * from (select * from credit_lines where id=2) test;",
    "topic.prefix"    : "JDBC.test_db_test",
    "validate.non.null"       : "false",
    "poll.interval.ms"        : "1000"
  }
}

在日志中查询:

connect  |  (org.apache.kafka.connect.runtime.tracing.TracerConfig)
connect  | [2023-03-01 19:42:30,132] INFO Initializing: org.apache.kafka.connect.runtime.TransformationChain{} (org.apache.kafka.connect.runtime.Worker)
connect  | [2023-03-01 19:42:30,133] INFO [Worker clientId=connect-1, groupId=jdbc_source_connector] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
connect  | [2023-03-01 19:42:30,133] INFO Starting JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask)
connect  | [2023-03-01 19:42:30,133] INFO JdbcSourceTaskConfig values: 
connect  |      batch.max.rows = 100
connect  |      catalog.pattern = null
connect  |      connection.attempts = 3
connect  |      connection.backoff.ms = 10000
connect  |      connection.password = [hidden]
connect  |      connection.url = jdbc:mysql://mysqldb:3306/silicon
connect  |      connection.user = root
connect  |      db.timezone = UTC
connect  |      dialect.name = 
connect  |      incrementing.column.name = id
connect  |      mode = incrementing
connect  |      numeric.mapping = null
connect  |      numeric.precision.mapping = false
connect  |      poll.interval.ms = 1000
connect  |      query = select * from (select * from credit_lines where id=2) test;
connect  |      query.retry.attempts = -1
connect  |      query.suffix = 
connect  |      quote.sql.identifiers = ALWAYS
connect  |      schema.pattern = null
connect  |      table.blacklist = []
connect  |      table.monitoring.startup.polling.limit.ms = 10000
connect  |      table.poll.interval.ms = 60000
connect  |      table.types = [TABLE]
connect  |      table.whitelist = []
connect  |      tables = []
connect  |      timestamp.column.name = []
connect  |      timestamp.delay.interval.ms = 0
connect  |      timestamp.granularity = connect_logical
connect  |      timestamp.initial = null
connect  |      topic.prefix = JDBC.test_db_test
connect  |      transaction.isolation.mode = DEFAULT
connect  |      validate.non.null = false
connect  |  (io.confluent.connect.jdbc.source.JdbcSourceTaskConfig)

我得到的错误:

 ERROR WorkerSourceTask{id=mysql-jdbc-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
connect  | org.apache.kafka.connect.errors.ConnectException: java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'WHERE `id` > -1 ORDER BY `id` ASC' at line 1
connect  |      at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:452)
connect  |      at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:470)
connect  |      at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:349)
connect  |      at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
connect  |      at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
connect  |      at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
connect  |      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect  |      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect  |      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect  |      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect  |      at java.base/java.lang.Thread.run(Thread.java:829)
connect  | Caused by: java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'WHERE `id` > -1 ORDER BY `id` ASC' at line 1
java apache-kafka apache-kafka-connect
2个回答
1
投票

您只是在搜索

id=2
,所以
mode = incrementing
对于
query
没有真正意义,因为它总是将
ORDER BY ID ASC
附加到查询中。

如果您总是想搜索

id = 2
,并不断更新该行,那么您可能需要
mode = bulk

或者,至少,你不能用分号结束查询,因为

WHERE id > -1 ORDER BY id ASC
是用递增模式添加的。


0
投票

从查询中删除了分号。之后,它开始工作

工作代码:

{
  "name": "mysql-jdbc",
  "config": {
    "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url"  : "jdbc:mysql://mysqldb:3306/silicon",
    "connection.user" : "root",
    "connection.password" : "root",
    "mode"            : "incrementing",
    "incrementing.column.name": "id",
    "query": "select * from (select * from credit_lines where id=2) test",
    "topic.prefix"    : "JDBC.test_db_test",
    "validate.non.null"       : "false",
    "poll.interval.ms"        : "1000"
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.