StreamSource:错误连接器错误后停止(org.apache.kafka.connect.cli.ConnectStandalone:84)

问题描述 投票:0回答:1

所以我尝试运行 connect-standalone.sh 通过传递两个属性文件来将日志文件中的数据作为消息生成到 Kafka 主题中

INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:57)
[2023-09-02 15:50:12,527] ERROR Failed to create connector for retail_logs_file_source.properties (org.apache.kafka.connect.cli.ConnectStandalone:74)
[2023-09-02 15:50:12,528] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:84)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches FileStreamSource, available connectors are: PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorCheckpointConnector, name='org.apache.kafka.connect.mirror.MirrorCheckpointConnector', version='3.5.1', encodedVersion=3.5.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorHeartbeatConnector, name='org.apache.kafka.connect.mirror.MirrorHeartbeatConnector', version='3.5.1', encodedVersion=3.5.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorSourceConnector, name='org.apache.kafka.connect.mirror.MirrorSourceConnector', version='3.5.1', encodedVersion=3.5.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='3.5.1', encodedVersion=3.5.1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='3.5.1', encodedVersion=3.5.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='3.5.1', encodedVersion=3.5.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='3.5.1', encodedVersion=3.5.1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='3.5.1', encodedVersion=3.5.1, type=source, typeName='source', location='classpath'}
    at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:123)
    at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:107)
    at org.apache.kafka.connect.cli.ConnectStandalone.processExtraArgs(ConnectStandalone.java:81)
    at org.apache.kafka.connect.cli.AbstractConnectCli.startConnect(AbstractConnectCli.java:150)
    at org.apache.kafka.connect.cli.AbstractConnectCli.run(AbstractConnectCli.java:94)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:112)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches FileStreamSource, available connectors are: PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorCheckpointConnector, name='org.apache.kafka.connect.mirror.MirrorCheckpointConnector', version='3.5.1', encodedVersion=3.5.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorHeartbeatConnector, name='org.apache.kafka.connect.mirror.MirrorHeartbeatConnector', version='3.5.1', encodedVersion=3.5.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorSourceConnector, name='org.apache.kafka.connect.mirror.MirrorSourceConnector', version='3.5.1', encodedVersion=3.5.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='3.5.1', encodedVersion=3.5.1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='3.5.1', encodedVersion=3.5.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='3.5.1', encodedVersion=3.5.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='3.5.1', encodedVersion=3.5.1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='3.5.1', encodedVersion=3.5.1, type=source, typeName='source', location='classpath'}
    at org.apache.kafka.connect.runtime.isolation.Plugins.connectorClass(Plugins.java:253)
    at org.apache.kafka.connect.runtime.isolation.Plugins.newConnector(Plugins.java:224)
    at org.apache.kafka.connect.runtime.AbstractHerder.lambda$getConnector$6(AbstractHerder.java:702)
    at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
    at org.apache.kafka.connect.runtime.AbstractHerder.getConnector(AbstractHerder.java:702)
    at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:470)
    at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$2(AbstractHerder.java:390)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
[2023-09-02 15:50:12,533] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:67)
[2023-09-02 15:50:12,534] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:348)
[2023-09-02 15:50:12,541] INFO Stopped http_8083@64712be{HTTP/1.1, (http/1.1)}{0.0.0.0:8083} (org.eclipse.jetty.server.AbstractConnector:383)
[2023-09-02 15:50:12,541] INFO node0 Stopped scavenging (org.eclipse.jetty.server.session:149)
[2023-09-02 15:50:12,546] INFO REST server stopped (org.apache.kafka.connect.runtime.rest.RestServer:365)
[2023-09-02 15:50:12,546] INFO Herder stopping (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:107)
[2023-09-02 15:50:12,547] INFO Worker stopping (org.apache.kafka.connect.runtime.Worker:223)
[2023-09-02 15:50:12,549] INFO Stopped FileOffsetBackingStore (org.apache.kafka.connect.storage.FileOffsetBackingStore:71)
[2023-09-02 15:50:12,549] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:693)
[2023-09-02 15:50:12,549] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:697)
[2023-09-02 15:50:12,550] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:703)
[2023-09-02 15:50:12,550] INFO App info kafka.connect for 10.172.0.3:8083 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
[2023-09-02 15:50:12,550] INFO Worker stopped (org.apache.kafka.connect.runtime.Worker:244)
[2023-09-02 15:50:12,552] INFO Herder stopped (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:125)
[2023-09-02 15:50:12,552] INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect:72)

我正在运行命令:

/opt/kafka/bin/connect-standalone.sh \
    retail_logs_standalone.properties \
    retail_logs_file_source.properties

我的retail_logs_file_source.properties包含以下配置:

name=dee-retail-file-source
connector.class=FileStreamSource
tasks.max=1
file=/opt/gen_logs/logs/access.log
topic=dee_retail

虽然我的retail_logs_standalone.properties包含以下配置:

bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/home/dee/kafka_connect/retail_logs_produce/retail.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

rest.port=18083
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include 
# any combination of: 
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
#plugin.path=

请帮忙

java apache-kafka apache-kafka-connect
1个回答
0
投票

错误表明您的

plugin.path
不包含包含 Kafka 附带的
connect-file
JAR 的文件系统目录

参见步骤 6 - https://kafka.apache.org/quickstart

© www.soinside.com 2019 - 2024. All rights reserved.