汇编:错误无法运行查询表TimestampIncrementingTableQuerier mysql-jdbc

问题描述 投票:1回答:2

我正在尝试使用MySQL的模式时间戳,行数有限,因为我的表大小是2.6 GB。

以下是我正在使用的连接器属性:

{
        "name": "jdbc_source_mysql_registration_query",
        "config": {
                 "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                 "key.converter": "io.confluent.connect.avro.AvroConverter",
                 "key.converter.schema.registry.url": "http://localhost:8081",
                 "value.converter": "io.confluent.connect.avro.AvroConverter",
                 "value.converter.schema.registry.url": "http://localhost:8081",
                 "connection.url": "jdbc:mysql://localhost:3310/users?zeroDateTimeBehavior=ROUND&useCursorFetch=true&defaultFetchSize=1000&user=kotesh&password=kotesh",
                 "query": "SELECT matriid,DateUpdated  from users.employee WHERE date(DateUpdated)>='2018-11-28' ",
                 "mode": "timestamp",
                 "timestamp.column.name": "DateUpdated",
                 "validate.non.null": "false",
                 "topic.prefix": "mysql-prod-kot-"
        }
}

我得到如下:

INFO TimestampIncrementingTableQuerier {table = null,query ='SELECT matriid,DateUpdated from users.employee WHERE date(DateUpdated)> ='2018-11-28'',topicPrefix ='mysql-prod-kot-',incrementingColumn ='', timestampColumns = [DateUpdated]}编写的SQL查询:SELECT matriid,DateUpdated from users.employee WHERE date(DateUpdated)> ='2018-11-28'WHERE DateUpdated>?和DateUpdated <? ORDER BY DateUpdated ASC(io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:161)[2018-11-29 17:29:00,981]错误无法运行查询表TimestampIncrementingTableQuerier {table = null,query ='SELECT matriid, DateUspdated from users.employee WHERE date(DateUpdated)> ='2018-11-28'',topicPrefix ='mysql-prod-kot-',incrementingColumn ='',timestampColumns = [DateUpdated]}:{}(io.confluent .connect.jdbc.source.JdbcSourceTask:328)java.sql.SQLSyntaxErrorException:SQL语法中有错误;查看与您的MySQL服务器版本对应的手册,以便在'WHERE DateUpdated>'1970-01-01 00:00:00.0'和'DateUpdated <'2018-11-29 17'附近使用正确的语法

jdbc apache-kafka apache-kafka-connect confluent confluent-schema-registry
2个回答
1
投票

错误如下所示:

java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; 
check the manual that corresponds to your MySQL server version for the right syntax to use near 
'WHERE `DateUpdated` > '1970-01-01 00:00:00.0' AND `DateUpdated` < '2018-11-29 17' at line 1

这是因为您正在使用query而且还使用"mode": "timestamp",因此当您在查询中指定了一个时,连接器会尝试附加它自己的WHERE子句,这会导致无效的SQL

每个docs用于JDBC源连接器:

为了正确构造增量查询,必须可以在此查询中附加WHERE子句(即,不能使用WHERE子句)。如果使用WHERE子句,则必须自己处理增量查询。


0
投票

发生这种情况是因为你试图同时使用"mode": "timestamp"queryTimestampIncrementingTableQuerierWHERE子句附加到与WHERE中现有query子句冲突的查询中。

JDBC source connector docs对此很清楚:

query

如果已指定,则执行查询以选择新行或更新行。如果要连接表,选择表中的列子集或过滤数据,请使用此设置。如果使用此连接器,则仅使用此查询复制数据 - 将禁用整个表复制。不同的查询模式仍可用于增量更新,但为了正确构造增量查询,必须可以在此查询中附加WHERE子句(即,不能使用WHERE子句)。如果使用WHERE子句,则必须自己处理增量查询。

作为一种变通方法,您可以将查询修改为(取决于您使用的SQL风格)

SELECT * FROM ( SELECT * FROM table WHERE ...)

要么

WITH a AS
   SELECT * FROM b
    WHERE ...
SELECT * FROM a

例如,在您的情况下,查询应该是

"query":"SELECT * FROM (SELECT matriid,DateUpdated  from users.employee WHERE date(DateUpdated)>='2018-11-28') o"
© www.soinside.com 2019 - 2024. All rights reserved.