kafka2.12-2.4.0 confluent5.4.1
我正在尝试使用Confluent的模式注册。但是当我启动模式注册和连接分布式时。连接日志未报告错误。
connect-avro-distributed.properties
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://k2:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://k2:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
plugin.path=/usr/local/tools/confluent-5.4.1/share/java,/usr/local/tools/kafka/kafka_2.12-2.4.0/plugin
我已经配置了融合的jar地址,以便connect可以找到该类。 (plugin.path)
但是当我发布收集器请求时。
{
"name": "dbz-mysql-avro-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "xx.xx.xx.xx",
"database.port": "3306",
"database.user": "debezium",
"database.history.kafka.topic": "dbhistory.debezium.mysql.avro",
"database.password": "123456",
"database.server.id": "184124",
"database.server.name": "debezium",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://k2:8081",
"value.converter.schema.registry.url": "http://k2:8081",
"table.whitelist": "debeziumdb.hosttable",
"database.history.kafka.bootstrap.servers": "k1:9092,k2:9092,k3:9092"
}
}
引发异常。
[2020-04-23 10:37:00,064] INFO Creating task dbz-mysql-avro-connector-0 (org.apache.kafka.connect.runtime.Worker:419)
[2020-04-23 10:37:00,065] INFO ConnectorConfig values:
config.action.reload = restart
connector.class = io.debezium.connector.mysql.MySqlConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class io.confluent.connect.avro.AvroConverter
name = dbz-mysql-avro-connector
tasks.max = 1
transforms = []
value.converter = class io.confluent.connect.avro.AvroConverter
(org.apache.kafka.connect.runtime.ConnectorConfig:347)
[2020-04-23 10:37:00,065] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = io.debezium.connector.mysql.MySqlConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class io.confluent.connect.avro.AvroConverter
name = dbz-mysql-avro-connector
tasks.max = 1
transforms = []
value.converter = class io.confluent.connect.avro.AvroConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:347)
[2020-04-23 10:37:00,067] INFO TaskConfig values:
task.class = class io.debezium.connector.mysql.MySqlConnectorTask
(org.apache.kafka.connect.runtime.TaskConfig:347)
[2020-04-23 10:37:00,067] INFO Instantiated task dbz-mysql-avro-connector-0 with version 1.1.0.Final of type io.debezium.connector.mysql.MySqlConnectorTask (org.apache.kafka.connect.runtime.Worker:434)
[2020-04-23 10:37:00,067] ERROR Failed to start task dbz-mysql-avro-connector-0 (org.apache.kafka.connect.runtime.Worker:470)
java.lang.NoClassDefFoundError: io/confluent/connect/avro/AvroConverterConfig
at io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:61)
at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:293)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:440)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1140)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1700(DistributedHerder.java:125)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:1155)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:1151)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-04-23 10:37:00,071] INFO [Worker clientId=connect-1, groupId=connect-cluster] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1125)
所有罐子都在此目录中。
现在我该怎么做才能引入该类,或者该类中不存在合流的版本?
谢谢。
我终于解决了这个异常。
我没有使用融合平台,只是安装了schema-registry组件。
确切地说,我只安装了社区版本,并且只激活了架构注册表组件。
然后我在官方网站上下载了Avro jar软件包,最后将其完全放入插件中,并成功启动了连接。