我正在使用Confluent HttpSinkConnector并遵循本页中提到的步骤:https://docs.confluent.io/current/connect/kafka-connect-http/index.html
检查了控制中心页面上显示的所有属性详细信息后,我已经更新了连接器接收器属性文件。我的最终属性文件如下:
name = HttpSink
connector.class = io.confluent.connect.http.HttpSinkConnector
tasks.max = 1
value.converter = org.apache.kafka.connect.storage.StringConverter
topics = http-messages
http.api.url = http://localhost:8080/api/messages
request.method = post
auth.type = none
reporter.result.topic.replication.factor = 1
reporter.result.topic.partitions = 1
reporter.bootstrap.servers = localhost:9092
confluent.topic.bootstrap.servers = localhost:9092
confluent.topic = http-messages
confluent.topic.replication.factor = 1
我还检查了“ http-messages”主题是否已经创建,并且其配置在控制中心上显示如下:
Overview MessagesSchemaConfiguration
name http-messages
partitions 1
compression.type producer
leader.replication.throttled.replicas
message.downconversion.enable true
min.insync.replicas 1
segment.jitter.ms 0
cleanup.policy delete
flush.ms 9223372036854775807
follower.replication.throttled.replicas
segment.bytes 1073741824
retention.ms 604800000
flush.messages 9223372036854775807
message.format.version 2.5-IV0
file.delete.delay.ms 60000
max.compaction.lag.ms 9223372036854775807
max.message.bytes 1048588
min.compaction.lag.ms 0
message.timestamp.type
CreateTime
preallocate false
min.cleanable.dirty.ratio 0.5
index.interval.bytes 4096
unclean.leader.election.enable false
retention.bytes -1
delete.retention.ms 86400000
segment.ms 604800000
message.timestamp.difference.max.ms 9223372036854775807
segment.index.bytes 10485760
但是,当我尝试运行HttpSink-connect任务时,该任务失败,并显示以下错误消息,如在Rest请求的响应中收到的状态消息的输出中发现的:curl -X GET localhost:8083/connectors/HttpSink/tasks/0/status
{“ id”:0,“ state”:“ FAILED”,“ worker_id”:“ 127.0.0.1:8083","trace":"org.apache.kafka.connect.errors.ConnectException:无法管理主题:\ n \ tio io.confluent.connect.reporter.ReporterAdminClient.handleExecutionException(ReporterAdminClient.java:109)\ n \ tat io.confluent.connect.reporter.ReporterAdminClient.createTopic(ReporterAdminClient.java:57)\ n \ tat io .confluent.connect.reporter.Reporter.createDestinationTopicsIfNeeded(Reporter.java:433)\ n \ tio io.confluent.connect.reporter.Reporter.configure(Reporter.java:80)\ n \ tat io.confluent.connect.http .HttpSinkTask.start(HttpSinkTask.java:49)\ n \ tat org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:305)\ n \ tat org.apache.kafka.connect.runtime.WorkerSinkTask .execute(WorkerSinkTask.java:193)\ n \ tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\ n \ tat org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:234)\n\tat java.util.concurrent.Executors $ RunnableAdapter.call(Executors.java: 511)\ n \ tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\ n \ tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\ n \ tat java.util。 parallel.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:624)\ n \ t java.lang.Thread.run(Thread.java:748)\ n原因:java.util.concurrent.ExecutionException:org.apache.kafka。 common.errors.InvalidReplicationFactorException:复制因子:比可用代理大3倍:1. \ n \ tat org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)\ n \ tat org.apache.kafka .common.internals.KafkaFutureImpl.access $ 000(KafkaFutureImpl.java:32)\ n \ tat org.apache.kafka.common.internals.KafkaFutureImpl $ SingleWaiter.await(KafkaFutureImpl.java:89)\ n \ tat org。 kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)\ n \ tio io.confluent.connect.reporter.ReporterAdminClient.createTopic(ReporterAdminClient.java:53)\ n \ t ...更多12 :org.apache.kafka.common.errors.InvalidReplica tionFactorException:复制因子:比可用代理大3:1. \ n“}
此外,我在Rest请求“ curl -X GET localhost:8083 / connectors / HttpSink / topics”的响应中得到以下响应:
{"HttpSink":{"topics":[]}}
请帮助我解决此问题。
为了克服此例外,除了reporter.result.topic.replication.factor和confluent.topic.replication.factor外,您还希望将reporter.error.topic.replication.factor设置为1以及每个kafka -connect-http配置属性-https://docs.confluent.io/current/connect/kafka-connect-http/connector_config.html
"confluent.topic.replication.factor": 1,
"reporter.result.topic.replication.factor": 1,
"reporter.error.topic.replication.factor": 1