我正在尝试启动 kafka jdbc 接收器连接器 https://www.confluence.io/hub/confluenceinc/kafka-connect-jdbc 将主题中的数据保存到 postgreSql 数据库。我正在使用以下配置
{
"name": "postgres-balance-sink-connector-name",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "account-operations",
"name": "postgres-balance-sink-connector-name",
"connection.url": "jdbc:postgresql://db:5432/kafka_db",
"connection.user": "postgres",
"connection.password": "example",
"insert.mode": "UPSERT",
"auto.create": "true",
"auto.evolve": "true",
"tasks.max": "1"
}
}
在启动连接器期间抛出以下错误
[2023-09-21 18:29:37,312] ERROR [postgres-balance-sink-connector-name|task-0] WorkerSinkTask{id=postgres-balance-sink-connector-name-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:210)
2023-09-21T18:29:37.313123031Z java.lang.NullPointerException: Cannot invoke "io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.create(org.apache.kafka.common.config.AbstractConfig)" because "bestMatch" is null
2023-09-21T18:29:37.313126905Z at io.confluent.connect.jdbc.dialect.DatabaseDialects.findBestFor(DatabaseDialects.java:134)
2023-09-21T18:29:37.313130191Z at io.confluent.connect.jdbc.sink.JdbcSinkTask.initWriter(JdbcSinkTask.java:68)
2023-09-21T18:29:37.313133312Z at io.confluent.connect.jdbc.sink.JdbcSinkTask.start(JdbcSinkTask.java:53)
2023-09-21T18:29:37.313136366Z at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:315)
2023-09-21T18:29:37.313139418Z at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
2023-09-21T18:29:37.313142452Z at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
2023-09-21T18:29:37.313145341Z at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
2023-09-21T18:29:37.313148405Z at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
2023-09-21T18:29:37.313151410Z at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2023-09-21T18:29:37.313154545Z at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
2023-09-21T18:29:37.313157629Z at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
2023-09-21T18:29:37.313160698Z at java.base/java.lang.Thread.run(Thread.java:833)
2023-09-21T18:29:37.313211156Z [2023-09-21 18:29:37,313] INFO [postgres-balance-sink-connector-name|task-0] Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:170)
2023-09-21T18:29:37.313924296Z [2023-09-21 18:29:37,313] WARN [postgres-balance-sink-connector-name|task-0] Could not stop task (org.apache.kafka.connect.runtime.WorkerSinkTask:175)
2023-09-21T18:29:37.313937766Z java.lang.NullPointerException: Cannot invoke "io.confluent.connect.jdbc.sink.JdbcDbWriter.closeQuietly()" because "this.writer" is null
2023-09-21T18:29:37.313941811Z at io.confluent.connect.jdbc.sink.JdbcSinkTask.stop(JdbcSinkTask.java:172)
2023-09-21T18:29:37.313958175Z at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:173)
2023-09-21T18:29:37.313961419Z at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:181)
2023-09-21T18:29:37.313964049Z at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:214)
2023-09-21T18:29:37.313966924Z at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
2023-09-21T18:29:37.313969524Z at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
2023-09-21T18:29:37.313972291Z at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
2023-09-21T18:29:37.313975315Z at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2023-09-21T18:29:37.313978394Z at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
2023-09-21T18:29:37.313981380Z at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
2023-09-21T18:29:37.313984474Z at java.base/java.lang.Thread.run(Thread.java:833)
我尝试添加到配置中
"dialect.name": "PostgreSqlDatabaseDialect",
但是我收到了错误
{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value PostgreSqlDatabaseDialect for configuration dialect.name: Invalid enumerator\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}
这很有趣,因为文档将其列为有效值 https://docs.confluence.io/kafka-connectors/jdbc/current/sink-connector/sink_config_options.html
任何提示/想法表示赞赏。
所以问题是我正在使用这样的 Maven Shade 插件创建 Fat jar
<execution>
<id>jdbc-connector-fat-jar-execution</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>io.confluent:kafka-connect-jdbc</artifact>
<includes>
<include>**/*</include>
</includes>
</filter>
<!-- invalid signature file digest for Manifest main attributes exception -->
<filter>
<artifact>*</artifact>
<excludes>
<exclude>META-INF/**</exclude>
</excludes>
</filter>
</filters>
<finalName>jdbc-kafka-connect-fat-${kafka.connect.jdbc.version}</finalName>
<outputDirectory>connector-jars</outputDirectory>
<shadedArtifactAttached>false</shadedArtifactAttached>
</configuration>
</execution>
类似的配置适用于 mongodb 接收器连接器。 目前,我设法使其工作的唯一方法是手动下载并复制连接器文件 https://www.confluence.io/hub/confluenceinc/kafka-connect-jdbc