创建连接器时无法获取 producer.override.* 的值

问题描述 投票:0回答:1

为了通过 CSVSourceConnector 获得更高的吞吐量,在创建连接器时使用 producer.override.* 覆盖生产者配置。但是看不出有什么区别,而且代码值也没有反映出来,比如 batch.size 只显示默认值。

Curl 命令创建连接器:

curl -i -X PUT -H“接受:应用程序/json”
-H "Content-Type:application/json" http://localhost:8083/connectors/CsvSchemaSpoolDir5.6/config
-d'{ "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector", “名称”:“CsvSchemaSpoolDir5.6”, "topic": "topicPrefix1.7", "tasks.max": "1", "cleanup.policy": "MOVEBYDATE", "schema.generation.enabled": "true", "input.path": "/opt/csv-source-connector-poc/testdata/singleTopicMultipleCSVFiles", "error.path": "/opt/csv-source-connector-poc/error", "finished.path": "/opt/csv-source-connector-poc/finished", "input.file.pattern": "^.csv$", "error.max.count": "5", "fetching.schema.registry.data": "true", "schema.registry.url": "https://psrc-znpo0.ap-southeast-2.aws.confluent.cloud", "multiple.csv.files.to.single.topic": "true", "basic.auth.user.info": "sdfsdfs:sdfsdfsdfsdfds", "file.topic.mapping.path": "/opt/csv-source-connector-poc/fileTopicMapping", "producer.override.batch.size": "200", "producer.override.linger.ms": "50", "producer.override.compression.type": "lz4", “producer.override.acks”:“1”* }'

代码日志:SpoolDirCsvSourceTask.process():this.config.batchSize:50

尝试使用更新的 producer.override.* 值,但似乎不起作用。除了curl命令,还需要更新哪个地方才能得到这些值

java apache-kafka confluent-platform kafka-producer-api s3-kafka-connector
1个回答
0
投票

确保您运行的 Connect 版本支持这些选项;根据文档

,它们是在 2.3.0 发布后才添加的

https://kafka.apache.org/documentation/#connect

© www.soinside.com 2019 - 2024. All rights reserved.