Confluent InfluxDB接收器连接器

问题描述 投票:0回答:1

我正在尝试使用Confluent InfluxDB sink connector将一个主题的数据导入我的InfluxDB。配置如下所示:

connector.class=io.confluent.influxdb.InfluxDBSinkConnector
influxdb.url=https://mydb
topics=mytopic
tasks.max=1

通过Kafka Connect UI创建新连接器时获得的所有内容都是以下异常:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.kafka.connect.data.Struct
    at io.confluent.influxdb.InfluxDBSinkTask.put(InfluxDBSinkTask.java:109)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
    ... 10 more

主题中的值是json字符串,如下所示:{"pid":1,"filename":"test1.csv"}。我在这里找不到任何配置?

更新:这是我的工作人员配置:

config.storage.topic=kafka-connect-my-config
rest.port=28082
group.id=kafka-connect-mygroup
plugin.path=/usr/share/java,/connect-plugins
key.converter=org.apache.kafka.connect.storage.StringConverter
offset.storage.topic=kafka-connect-my-offsets
bootstrap.servers={my broker urls}
value.converter=org.apache.kafka.connect.storage.StringConverter
status.storage.topic=kafka-connect-my-status
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.advertised.host.name=kafka-development-kafka-connect-1
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
apache-kafka-connect confluent
1个回答
0
投票

如果您的数据是JSON,则需要配置Apache Kafka Connect以将其读取。为此,在您的工作者属性中(或者,根据每个连接器覆盖它),您需要:

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

有关更多详细信息,请参阅this article

© www.soinside.com 2019 - 2024. All rights reserved.