由于方言识别错误,kafka jdbc 连接器启动失败

问题描述 投票:0回答:1

我正在尝试启动 kafka jdbc 接收器连接器 https://www.confluence.io/hub/confluenceinc/kafka-connect-jdbc 将主题中的数据保存到 postgreSql 数据库。我正在使用以下配置

{
  "name": "postgres-balance-sink-connector-name",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "topics": "account-operations",
    "name": "postgres-balance-sink-connector-name",
    "connection.url": "jdbc:postgresql://db:5432/kafka_db",
    "connection.user": "postgres",
    "connection.password": "example",
    "insert.mode": "UPSERT",
    "auto.create": "true",
    "auto.evolve": "true",
    "tasks.max": "1"
  }
}

在启动连接器期间抛出以下错误

[2023-09-21 18:29:37,312] ERROR [postgres-balance-sink-connector-name|task-0] WorkerSinkTask{id=postgres-balance-sink-connector-name-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:210)
2023-09-21T18:29:37.313123031Z java.lang.NullPointerException: Cannot invoke "io.confluent.connect.jdbc.dialect.DatabaseDialectProvider.create(org.apache.kafka.common.config.AbstractConfig)" because "bestMatch" is null
2023-09-21T18:29:37.313126905Z  at io.confluent.connect.jdbc.dialect.DatabaseDialects.findBestFor(DatabaseDialects.java:134)
2023-09-21T18:29:37.313130191Z  at io.confluent.connect.jdbc.sink.JdbcSinkTask.initWriter(JdbcSinkTask.java:68)
2023-09-21T18:29:37.313133312Z  at io.confluent.connect.jdbc.sink.JdbcSinkTask.start(JdbcSinkTask.java:53)
2023-09-21T18:29:37.313136366Z  at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:315)
2023-09-21T18:29:37.313139418Z  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
2023-09-21T18:29:37.313142452Z  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
2023-09-21T18:29:37.313145341Z  at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
2023-09-21T18:29:37.313148405Z  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
2023-09-21T18:29:37.313151410Z  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2023-09-21T18:29:37.313154545Z  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
2023-09-21T18:29:37.313157629Z  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
2023-09-21T18:29:37.313160698Z  at java.base/java.lang.Thread.run(Thread.java:833)
2023-09-21T18:29:37.313211156Z [2023-09-21 18:29:37,313] INFO [postgres-balance-sink-connector-name|task-0] Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:170)
2023-09-21T18:29:37.313924296Z [2023-09-21 18:29:37,313] WARN [postgres-balance-sink-connector-name|task-0] Could not stop task (org.apache.kafka.connect.runtime.WorkerSinkTask:175)
2023-09-21T18:29:37.313937766Z java.lang.NullPointerException: Cannot invoke "io.confluent.connect.jdbc.sink.JdbcDbWriter.closeQuietly()" because "this.writer" is null
2023-09-21T18:29:37.313941811Z  at io.confluent.connect.jdbc.sink.JdbcSinkTask.stop(JdbcSinkTask.java:172)
2023-09-21T18:29:37.313958175Z  at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:173)
2023-09-21T18:29:37.313961419Z  at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:181)
2023-09-21T18:29:37.313964049Z  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:214)
2023-09-21T18:29:37.313966924Z  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
2023-09-21T18:29:37.313969524Z  at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
2023-09-21T18:29:37.313972291Z  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
2023-09-21T18:29:37.313975315Z  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2023-09-21T18:29:37.313978394Z  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
2023-09-21T18:29:37.313981380Z  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
2023-09-21T18:29:37.313984474Z  at java.base/java.lang.Thread.run(Thread.java:833)

我尝试添加到配置中

"dialect.name": "PostgreSqlDatabaseDialect",

但是我收到了错误

{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value PostgreSqlDatabaseDialect for configuration dialect.name: Invalid enumerator\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}

这很有趣,因为文档将其列为有效值 https://docs.confluence.io/kafka-connectors/jdbc/current/sink-connector/sink_config_options.html

任何提示/想法表示赞赏。

postgresql apache-kafka apache-kafka-connect
1个回答
0
投票

所以问题是我正在使用这样的 Maven Shade 插件创建 Fat jar

<execution>
<id>jdbc-connector-fat-jar-execution</id>
<phase>package</phase>
<goals>
    <goal>shade</goal>
</goals>
<configuration>
    <createDependencyReducedPom>false</createDependencyReducedPom>
    <filters>
        <filter>
            <artifact>io.confluent:kafka-connect-jdbc</artifact>
            <includes>
                <include>**/*</include>
            </includes>
        </filter>
        <!-- invalid signature file digest for Manifest main attributes exception -->
        <filter>
            <artifact>*</artifact>
            <excludes>
                <exclude>META-INF/**</exclude>
            </excludes>
        </filter>
    </filters>
    <finalName>jdbc-kafka-connect-fat-${kafka.connect.jdbc.version}</finalName>
    <outputDirectory>connector-jars</outputDirectory>
    <shadedArtifactAttached>false</shadedArtifactAttached>
</configuration>
</execution>

类似的配置适用于 mongodb 接收器连接器。 目前,我设法使其工作的唯一方法是手动下载并复制连接器文件 https://www.confluence.io/hub/confluenceinc/kafka-connect-jdbc

© www.soinside.com 2019 - 2024. All rights reserved.