我有一个由 Strimzi 和 Apicurio Registry 部署的 Kafka 集群,用于 Kafka 模式注册表。
我希望在 JDBC sink 连接器中使用
AvroConverter
将数据从 Kafka 接收到 TimescaleDB。
这是我的 Kafka Connect Dockerfile:
FROM docker.io/alpine:3.17.3 AS builder
USER root:root
RUN mkdir -p /opt/kafka/plugins/ \
# jdbc-connector-for-apache-kafka
# https://github.com/aiven/jdbc-connector-for-apache-kafka
&& wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.zip https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.zip \
&& unzip jdbc-connector-for-apache-kafka.zip -d /opt/kafka/plugins/ \
&& rm -f jdbc-connector-for-apache-kafka.zip
USER 1001
FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
USER 1001
我的卡夫卡连接:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: hm-kafka-iot-kafka-connect
namespace: hm-kafka
annotations:
strimzi.io/use-connector-resources: "true"
spec:
image: ghcr.io/hongbo-miao/hm-kafka-iot-kafka-connect:latest
replicas: 1
bootstrapServers: hm-kafka-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: hm-kafka-cluster-ca-cert
certificate: ca.crt
config:
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
externalConfiguration:
volumes:
- name: hm-iot-db-credentials-volume
secret:
secretName: hm-iot-db-credentials
我的 JDBC 接收器连接器:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: hm-motor-jdbc-sink-kafka-connector
namespace: hm-kafka
labels:
strimzi.io/cluster: hm-kafka-iot-kafka-connect
spec:
class: io.aiven.connect.jdbc.JdbcSinkConnector
tasksMax: 32
config:
connector.class: io.aiven.connect.jdbc.JdbcSinkConnector
tasks.max: 32
topics: hm.motor
connection.url: jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db
connection.user: "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}"
connection.password: "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}"
insert.mode: multi
batch.size: 100000
# table
table.name.format: motor
# timestamp
transforms: convertTimestamp
transforms.convertTimestamp.type: org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.convertTimestamp.field: timestamp
transforms.convertTimestamp.target.type: Timestamp
# value
value.converter: io.apicurio.registry.utils.converter.AvroConverter
value.converter.apicurio.registry.url: http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/registry/v2
value.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy
value.converter.apicurio.registry.as-confluent: true
(注意与
apicurio.registry
相关的配置很可能也有问题。)
但是,我遇到了这个错误(我们称之为错误 1):
2023-05-01 07:23:23,849 ERROR [hm-motor-jdbc-sink-kafka-connector|worker] [Worker clientId=connect-1, groupId=connect-cluster] Failed to start connector 'hm-motor-jdbc-sink-kafka-connector' (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [StartAndStopExecutor-connect-1-1]
org.apache.kafka.connect.errors.ConnectException: Failed to start connector: hm-motor-jdbc-sink-kafka-connector
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$startConnector$34(DistributedHerder.java:1800)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:320)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1821)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getConnectorStartingCallable$36(DistributedHerder.java:1827)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value io.apicurio.registry.utils.converter.AvroConverter for configuration value.converter: Class io.apicurio.registry.utils.converter.AvroConverter could not be found.
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:744)
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:490)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:113)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:133)
at org.apache.kafka.connect.runtime.ConnectorConfig.<init>(ConnectorConfig.java:232)
at org.apache.kafka.connect.runtime.SinkConnectorConfig.<init>(SinkConnectorConfig.java:85)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:299)
... 6 more
根据错误,我在我的Kafka Connect Dockerfle中添加了apicurio-registry-utils-converter:
FROM docker.io/alpine:3.17.3 AS builder
USER root:root
RUN mkdir -p /opt/kafka/plugins/ \
# jdbc-connector-for-apache-kafka
# https://github.com/aiven/jdbc-connector-for-apache-kafka
&& wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.zip https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.zip \
&& unzip jdbc-connector-for-apache-kafka.zip -d /opt/kafka/plugins/ \
&& rm -f jdbc-connector-for-apache-kafka.zip \
# apicurio-registry-utils-converter
# https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-utils-converter
&& wget --no-verbose https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-utils-converter/2.4.2.Final/apicurio-registry-utils-converter-2.4.2.Final.jar \
&& mkdir -p /opt/kafka/plugins/apicurio-registry-utils-converter/ \
&& mv apicurio-registry-utils-converter-2.4.2.Final.jar /opt/kafka/plugins/apicurio-registry-utils-converter/
USER 1001
FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
USER 1001
现在可以成功定位
io.apicurio.registry.utils.converter.AvroConverter
,但是我有一个新的错误。 (我们称之为错误 2)
2023-05-01 06:58:11,129 INFO [hm-motor-jdbc-sink-kafka-connector|task-0] TaskConfig values:
task.class = class io.aiven.connect.jdbc.sink.JdbcSinkTask
(org.apache.kafka.connect.runtime.TaskConfig) [StartAndStopExecutor-connect-1-5]
2023-05-01 06:58:11,129 INFO [hm-motor-jdbc-sink-kafka-connector|task-0] Instantiated task hm-motor-jdbc-sink-kafka-connector-0 with version null of type io.aiven.connect.jdbc.sink.JdbcSinkTask (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-5]
2023-05-01 06:58:11,129 ERROR [hm-motor-jdbc-sink-kafka-connector|task-0] Failed to start task hm-motor-jdbc-sink-kafka-connector-0 (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-5]
java.lang.NoClassDefFoundError: io/apicurio/registry/serde/avro/AvroKafkaSerializer
at io.apicurio.registry.utils.converter.AvroConverter.configure(AvroConverter.java:69)
at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:324)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:618)
at org.apache.kafka.connect.runtime.Worker.startSinkTask(Worker.java:521)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1723)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$31(DistributedHerder.java:1773)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.ClassNotFoundException: io.apicurio.registry.serde.avro.AvroKafkaSerializer
... 10 more
基于错误,我在我的Kafka Connect Dockerfile中添加了apicurio-registry-serdes-avro-serde:
FROM docker.io/alpine:3.17.3 AS builder
USER root:root
RUN mkdir -p /opt/kafka/plugins/ \
# jdbc-connector-for-apache-kafka
# https://github.com/aiven/jdbc-connector-for-apache-kafka
&& wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.zip https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.zip \
&& unzip jdbc-connector-for-apache-kafka.zip -d /opt/kafka/plugins/ \
&& rm -f jdbc-connector-for-apache-kafka.zip \
# apicurio-registry-utils-converter
# https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-utils-converter
&& wget --no-verbose https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-utils-converter/2.4.2.Final/apicurio-registry-utils-converter-2.4.2.Final.jar \
&& mkdir -p /opt/kafka/plugins/apicurio-registry-utils-converter/ \
&& mv apicurio-registry-utils-converter-2.4.2.Final.jar /opt/kafka/plugins/apicurio-registry-utils-converter/ \
# apicurio-registry-serdes-avro-serde
# https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-serdes-avro-serde
&& wget --no-verbose https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-serdes-avro-serde/2.4.2.Final/apicurio-registry-serdes-avro-serde-2.4.2.Final.jar \
&& mkdir -p /opt/kafka/plugins/apicurio-registry-serdes-avro-serde/ \
&& mv apicurio-registry-serdes-avro-serde-2.4.2.Final.jar /opt/kafka/plugins/apicurio-registry-serdes-avro-serde/
USER 1001
FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
USER 1001
但这一次,错误2依然存在
apicurio-registry-serdes-avro-serde
似乎不是修复错误 2 的正确依赖项。正确的依赖项是什么?谢谢!
我已经按照@OneCricketeer的建议切换到
kafka-connect-avro-converter
/apis/ccompat/v6/
现在。这是我要使用的 Kafka Connect
io.confluent.connect.avro.AvroConverter
FROM docker.io/alpine:3.17.3 AS builder
USER root:root
# kafka-connect-avro-converter
# https://www.confluent.io/hub/confluentinc/kafka-connect-avro-converter
&& wget --no-verbose --output-document=kafka-connect-avro-converter.zip https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-avro-converter/versions/7.3.3/confluentinc-kafka-connect-avro-converter-7.3.3.zip \
&& mkdir -p /opt/kafka/plugins/kafka-connect-avro-converter/ \
&& unzip kafka-connect-avro-converter.zip -d /opt/kafka/plugins/kafka-connect-avro-converter/ \
&& rm -f kafka-connect-avro-converter.zip \
# jdbc-connector-for-apache-kafka
# https://github.com/aiven/jdbc-connector-for-apache-kafka
&& wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.tar https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.tar \
&& mkdir -p /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ \
&& tar -x -f jdbc-connector-for-apache-kafka.tar -C /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ \
&& rm -f jdbc-connector-for-apache-kafka.tar
USER 1001
FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
USER 1001
关于相应的 JDBC Sink 连接器配置,我在 org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id -xxx
更新:我发现 Confluent Avro 格式与 vanilla Apache Avro 不同,这给 Spark 和其他工具带来了一些不便。所以他们是两个不同的方向。除了 Confluent 方向,我也会继续在这个方向寻找解决方案。
apicurio-registry-serdes-avro-serde
依赖关系对于该类是正确的。但它应该已经是 Avro 转换器包的一部分了。
但是 (De)Serializer 类不像 Converters 那样连接“插件”。您需要导出
CLASSPATH
环境变量以包含放置 JAR 文件的额外目录
我还建议不要在这里使用多阶段构建,除非 wget 和 unzip 在 Strimzi 图像中不可用。另外,Apicurio Registry 与 Confluent Converter 兼容,所以我建议使用 plugins 功能安装那些(和 Aiven 连接器),无论如何
问题是在我添加依赖项之前apicurio-registry-utils-converter.
但是,正确的是 apicurio-registry-distro-connect-converter。
所以这是我要使用的最终 Kafka Connect Dockerfile
io.apicurio.registry.utils.converter.AvroConverter
:
FROM docker.io/alpine:3.17.3 AS builder
USER root:root
RUN mkdir -p /opt/kafka/plugins/ \
# apicurio-registry-distro-connect-converter
# https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-distro-connect-converter
&& wget --no-verbose --output-document=apicurio-registry-distro-connect-converter.tar.gz https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-distro-connect-converter/2.4.2.Final/apicurio-registry-distro-connect-converter-2.4.2.Final.tar.gz \
&& mkdir -p /opt/kafka/plugins/apicurio-registry-distro-connect-converter/ \
&& tar -x -f apicurio-registry-distro-connect-converter.tar.gz -C /opt/kafka/plugins/apicurio-registry-distro-connect-converter/ \
&& rm -f apicurio-registry-distro-connect-converter.tar.gz \
# jdbc-connector-for-apache-kafka
# https://github.com/aiven/jdbc-connector-for-apache-kafka
&& wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.tar https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.tar \
&& mkdir -p /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ \
&& tar -x -f jdbc-connector-for-apache-kafka.tar -C /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ \
&& rm -f jdbc-connector-for-apache-kafka.tar
USER 1001
FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
USER 1001
为了比较,这里是使用方法
io.confluent.connect.avro.AvroConverter
FROM docker.io/alpine:3.17.3 AS builder
USER root:root
# kafka-connect-avro-converter
# https://www.confluent.io/hub/confluentinc/kafka-connect-avro-converter
&& wget --no-verbose --output-document=kafka-connect-avro-converter.zip https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-avro-converter/versions/7.3.3/confluentinc-kafka-connect-avro-converter-7.3.3.zip \
&& mkdir -p /opt/kafka/plugins/kafka-connect-avro-converter/ \
&& unzip kafka-connect-avro-converter.zip -d /opt/kafka/plugins/kafka-connect-avro-converter/ \
&& rm -f kafka-connect-avro-converter.zip \
# jdbc-connector-for-apache-kafka
# https://github.com/aiven/jdbc-connector-for-apache-kafka
&& wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.tar https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.tar \
&& mkdir -p /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ \
&& tar -x -f jdbc-connector-for-apache-kafka.tar -C /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ \
&& rm -f jdbc-connector-for-apache-kafka.tar
USER 1001
FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
USER 1001