为了通过 CSVSourceConnector 获得更高的吞吐量,在创建连接器时使用 producer.override.* 覆盖生产者配置。但是看不出有什么区别,而且代码值也没有反映出来,比如 batch.size 只显示默认值。
Curl 命令创建连接器:
curl -i -X PUT -H“接受:应用程序/json”
-H "Content-Type:application/json" http://localhost:8083/connectors/CsvSchemaSpoolDir5.6/config
-d'{
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
“名称”:“CsvSchemaSpoolDir5.6”,
"topic": "topicPrefix1.7",
"tasks.max": "1",
"cleanup.policy": "MOVEBYDATE",
"schema.generation.enabled": "true",
"input.path": "/opt/csv-source-connector-poc/testdata/singleTopicMultipleCSVFiles",
"error.path": "/opt/csv-source-connector-poc/error",
"finished.path": "/opt/csv-source-connector-poc/finished",
"input.file.pattern": "^.csv$",
"error.max.count": "5",
"fetching.schema.registry.data": "true",
"schema.registry.url": "https://psrc-znpo0.ap-southeast-2.aws.confluent.cloud",
"multiple.csv.files.to.single.topic": "true",
"basic.auth.user.info": "sdfsdfs:sdfsdfsdfsdfds",
"file.topic.mapping.path": "/opt/csv-source-connector-poc/fileTopicMapping",
"producer.override.batch.size": "200",
"producer.override.linger.ms": "50",
"producer.override.compression.type": "lz4",
“producer.override.acks”:“1”*
}'
代码日志:SpoolDirCsvSourceTask.process():this.config.batchSize:50
尝试使用更新的 producer.override.* 值,但似乎不起作用。除了curl命令,还需要更新哪个地方才能得到这些值
确保您运行的 Connect 版本支持这些选项;根据文档
,它们是在 2.3.0 发布后才添加的