我试图在sqlite中建立一个带有两个表的数据库。我的表中曾经有一个timestamp列。我正在尝试实现时间戳模式以捕获数据库中的增量更改。 Kafka连接失败,出现以下错误:
ERROR Failed to get current time from DB using Sqlite and query 'SELECT
CURRENT_TIMESTAMP'
(io.confluent.connect.jdbc.dialect.SqliteDatabaseDialect:471)
java.sql.SQLException: Error parsing time stamp
Caused by: java.text.ParseException: Unparseable date: "2019-02-05 02:05:29"
does not match (\p{Nd}++)\Q-\E(\p{Nd}++)\Q-\E(\p{Nd}++)\Q
\E(\p{Nd}++)\Q:\E(\p{Nd}++)\Q:\E(\p{Nd}++)\Q.\E(\p{Nd}++)
非常感谢您的帮助
配置:
name=test-query-sqlite-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:sqlite:employee.db
query=SELECT users.id, users.name, transactions.timestamp, transactions.payment_type FROM users JOIN transactions ON (users.id = transactions.user_id)
mode=timestamp
timestamp.column.name=timestamp
topic.prefix=test-joined
DDL:
CREATE TABLE transactions(id integer primary key not null,
payment_type text not null,
timestamp DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')),
user_id int not null,
constraint fk foreign key(user_id) references users(id)
);
CREATE TABLE users (id integer primary key not null,name text not null);
如果'timestamp'列的值采用'UNIX timestamp'的格式,则kafka connect jdbc连接器可以轻松检测到时间戳的变化。
sqlite> CREATE TABLE transact(timestamp TIMESTAMP DEFAULT (STRFTIME('%s', 'now')) not null,
...> id integer primary key not null,
...> payment_type text not null);
sqlite>
这些值可以插入为:
sqlite> INSERT INTO transact(timestamp,payment_type,id) VALUES (STRFTIME('%s', 'now'),'cash',1);
然后,与时间戳相关的更改由kafka jdbc源连接器检测到,可以按如下方式使用:
kafka-console-consumer --bootstrap-server localhost:9092 --topic jdbc-transact --from-beginning
{"timestamp":1562321516,"id":2,"payment_type":"card"}
{"timestamp":1562321790,"id":1,"payment_type":"online"}
我已经复制了此内容,并且已经将它记录为JDBC Source连接器的问题。您可以在此处监视它:https://github.com/confluentinc/kafka-connect-jdbc/issues/219