我不使用confluence,只使用apache kafka,已经添加mongo插件,获取.../连接器插件响应,列出MongoSinkConnector
[
{
"class": "com.mongodb.kafka.connect.MongoSinkConnector",
"type": "sink",
"version": "1.5.0"
},
{
"class": "com.mongodb.kafka.connect.MongoSourceConnector",
"type": "source",
"version": "1.5.0"
}...
]
当我使用以下帖子时,我收到错误:
要求:
{
"name": "mongodb-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "1",
"topics": "portalesMCor.db_portales.user",
"connection.uri": "mongodb://tiastest.mti:27017",
"database": "Big_Data",
"collection": "users",
"delete.on.null.values": false
}
}
回复:
{
"error_code": 500,
"message": "Failed to find any class that implements Connector and which name matches com.mongodb.kafka.connect.MongoSinkConnector, available connectors are: PluginDesc{klass=class io.debezium.connector.db2.Db2Connector, name='io.debezium.connector.db2.Db2Connector', version='1.5.0.Final', encodedVersion=1.5.0.Final, type=source, typeName='source', location='file:/kafka/connect/debezium-connector-db2/'}, PluginDesc{klass=class io.debezium.connector.mongodb.MongoDbConnector, name='io.debezium.connector.mongodb.MongoDbConnector', version='1.5.0.Final', encodedVersion=1.5.0.Final, type=source, typeName='source', location='file:/kafka/connect/debezium-connector-mongodb/'}, PluginDesc{klass=class io.debezium.connector.mysql.MySqlConnector, name='io.debezium.connector.mysql.MySqlConnector', version='1.5.0.Final', encodedVersion=1.5.0.Final, type=source, typeName='source', location='file:/kafka/connect/debezium-connector-mysql/'}, PluginDesc{klass=class io.debezium.connector.oracle.OracleConnector, name='io.debezium.connector.oracle.OracleConnector', version='1.5.0.Final', encodedVersion=1.5.0.Final, type=source, typeName='source', location='file:/kafka/connect/debezium-connector-oracle/'}, PluginDesc{klass=class io.debezium.connector.postgresql.PostgresConnector, name='io.debezium.connector.postgresql.PostgresConnector', version='1.5.0.Final', encodedVersion=1.5.0.Final, type=source, typeName='source', location='file:/kafka/connect/debezium-connector-postgres/'}, PluginDesc{klass=class io.debezium.connector.sqlserver.SqlServerConnector, name='io.debezium.connector.sqlserver.SqlServerConnector', version='1.5.0.Final', encodedVersion=1.5.0.Final, type=source, typeName='source', location='file:/kafka/connect/debezium-connector-sqlserver/'}, PluginDesc{klass=class io.debezium.connector.vitess.VitessConnector, name='io.debezium.connector.vitess.VitessConnector', version='1.5.0.Final', encodedVersion=1.5.0.Final, type=source, typeName='source', location='file:/kafka/connect/debezium-connector-vitess/'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='2.7.0', encodedVersion=2.7.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='2.7.0', encodedVersion=2.7.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorCheckpointConnector, name='org.apache.kafka.connect.mirror.MirrorCheckpointConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorHeartbeatConnector, name='org.apache.kafka.connect.mirror.MirrorHeartbeatConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorSourceConnector, name='org.apache.kafka.connect.mirror.MirrorSourceConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='2.7.0', encodedVersion=2.7.0, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='2.7.0', encodedVersion=2.7.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='2.7.0', encodedVersion=2.7.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='2.7.0', encodedVersion=2.7.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='2.7.0', encodedVersion=2.7.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='2.7.0', encodedVersion=2.7.0, type=source, typeName='source', location='classpath'}"
}
我在尝试使用 confluence 的
S3SinkConnector
时遇到了类似的错误。我在这里发布我的问题和解决方案,因为此页面显示在搜索结果中,并且缺乏有用的信息。
实际上发生的事情是,我在稍微修改了连接器的源代码后重新编译了连接器,因为原始版本并没有完全达到我想要的效果。但是当我尝试启动连接器时,我收到错误消息
Failed to find any class that implements Connector and which name matches io.confluent.connect.s3.S3SinkConnector
。即使相关的 .jar
在我的插件路径中,并且我可以通过 jar -tvf
看到 .jar 包含 io/confluent/connect/s3/S3SinkConnector.class
。
我已经没有想法了,所以我尝试将
.jar
文件移动到我的 kafka 安装的 libs
目录中,即移动到 /path/to/kafka_2.12-2.6.2/libs
中。然后我开始收到不同的错误消息:
java.lang.UnsupportedClassVersionError: io/confluent/connect/s3/S3SinkConnector has been compiled by a more recent version of the Java Runtime (class file version 55.0), this version of the Java Runtime only recognizes class file versions up to 52.0
因此,解决方案是使用旧版本的 Java 重新编译该文件(特别是使用
javac -release 8
的 Java 8,按照 这些说明。)现在它可以工作了!
根据返回的错误,您的工作线程上有以下连接器:
io.debezium.connector.db2.Db2Connector
io.debezium.connector.mongodb.MongoDbConnector
io.debezium.connector.mysql.MySqlConnector
io.debezium.connector.oracle.OracleConnector
io.debezium.connector.postgresql.PostgresConnector
io.debezium.connector.sqlserver.SqlServerConnector
io.debezium.connector.vitess.VitessConnector
org.apache.kafka.connect.file.FileStreamSinkConnector
org.apache.kafka.connect.file.FileStreamSourceConnector
org.apache.kafka.connect.mirror.MirrorCheckpointConnector
org.apache.kafka.connect.mirror.MirrorHeartbeatConnector
org.apache.kafka.connect.mirror.MirrorSourceConnector
org.apache.kafka.connect.tools.MockConnector
org.apache.kafka.connect.tools.MockSinkConnector
org.apache.kafka.connect.tools.MockSourceConnector
org.apache.kafka.connect.tools.SchemaSourceConnector
org.apache.kafka.connect.tools.VerifiableSinkConnector
org.apache.kafka.connect.tools.VerifiableSourceConnector
那么也许您正在针对不同的工作人员运行
/connector-plugins
REST 调用?
或者您是否有一个分布式工作集群,其中所有集群都没有安装 MongoDB 插件?
在 Dockerfile 中,您需要安装 mongodb 接收器连接器。例如这样:
RUN confluent-hub install--no-prompt mongodb/kafka-connect-mongodb:1.6.1