我有在kafka(test1,test2,test3)中创建的主题,我想在创建时将它们弹到弹性。我尝试了topics.regex,但它只为已存在的主题创建索引。如何在动态创建索引时将新主题下沉到索引中?
这是我用于kafka-sink的连接器配置:
{
"name": "elastic-sink-test-regex",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics.regex": "test[0-9]+",
"type.name": "kafka-connect",
"connection.url": "http://192.168.0.188:9200",
"key.ignore": "true",
"schema.ignore": "true",
"schema.enable": "false",
"batch.size": "100",
"flush.timeout.ms": "100000",
"max.buffered.records": "10000",
"max.retries": "10",
"retry.backoff.ms": "1000",
"max.in.flight.requests": "3",
"is.timebased.indexed": "False",
"time.index": "at"
}
}
在重新启动此连接器(或发生计划的重新平衡)之前,接收器连接器不会读取新主题。您可以运行Kafka Stream,从新主题中读取消息并将其放入类似结果的主题中。 Sink连接器从类似结果的主题中读取。
要保存“消息 - 主题”匹配,您可以使用Kafka记录标题。
确保它符合您的要求!