我按以下方式设置 kafka neo4j 接收器连接器:
创建
/tmp/plugins
,下载neo4j-kafka-connect-neo4j-5.0.2.zip
并解压到插件文件夹,即
修改
connect-standalone.properties
,添加plugin.path=/tmp/plugins
在配置中使用以下内容创建
connect-neo4j-sink.properties
topics=my-events
connector.class=streams.kafka.connect.sink.Neo4jSinkConnector
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
errors.retry.timeout=-1
errors.retry.delay.max.ms=1000
errors.tolerance=all
errors.log.enable=true
errors.log.include.messages=true
neo4j.server.uri=bolt://neo4j:7687
neo4j.authentication.basic.username=neo4j
neo4j.authentication.basic.password=neo4j
neo4j.encryption.enabled=false
neo4j.topic.cypher.my-topic=MERGE (p:Person{name: event.name, surname: event.surname}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)```
当我开始时:
/bin/connect-standalone.sh config/connect-standalone.properties config/connect-neo4j-sink.properties
我收到此错误:
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches streams.kafka.connect.sink.Neo4jSinkConnector
环境
我尝试重建工件,发现我的 java 版本已经过时。升级到 java 11 解决了我的问题。