主题 t1 的架构
{
"type": "record",
"name": "Envelope",
"namespace": "t1",
"fields": [
{
"name": "before",
"type": [
"null",
{
"type": "record",
"name": "Value",
"fields": [
{
"name": "id",
"type": {
"type": "long",
"connect.default": 0
},
"default": 0
},
{
"name": "createdAt",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
],
"default": null
},
],
"connect.name": "t1.Value"
}
],
"default": null
},
{
"name": "after",
"type": [
"null",
"Value"
],
"default": null
}
],
"connect.name": "t1.Envelope"
}
主题 t2 的架构
{
"type": "record",
"name": "Value",
"namespace": "t2",
"fields": [
{
"name": "id",
"type": {
"type": "long",
"connect.default": 0
},
"default": 0
},
{
"name": "createdAt",
"type": [
"null",
{
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
],
"default": null
}
],
"connect.name": "t2.Value"
}
s3-sink 连接器配置
connector.class=io.confluent.connect.s3.S3SinkConnector
behavior.on.null.values=ignore
s3.region=us-west-2
partition.duration.ms=1000
flush.size=1
tasks.max=3
timezone=UTC
topics.regex=t1,t2
aws.secret.access.key=******
locale=US
format.class=io.confluent.connect.s3.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
value.converter.schemas.enable=false
name=s3-sink-connector
aws.access.key.id=******
errors.tolerance=all
value.converter=org.apache.kafka.connect.json.JsonConverter
storage.class=io.confluent.connect.s3.storage.S3Storage
key.converter=org.apache.kafka.connect.storage.StringConverter
s3.bucket.name=s3-sink-connector-bucket
path.format=YYYY/MM/dd
timestamp.extractor=RecordField
timestamp.field=after.createdAt
通过使用此连接器配置,我收到 t2 topic 的错误,即 “createdAt 字段不存在”。 如果我设置 timestamp.field =createdAt 则会抛出错误 t1 主题“createdAt 字段不存在”。
如何通过对两个模式使用相同的连接器来同时指向两个模式中的 “createdAt” 字段?
是否可以通过使用单个 s3-sink 连接器配置来实现此目的?
如果这种情况是可能的,那么我该如何做到这一点,我必须使用哪些属性来实现这一目标?
如果有人对此有想法,请提出建议。 如果有任何其他方法可以做到这一点,请也建议这种方式。
所有主题都需要相同的时间戳字段;无法配置主题到字段的映射。
您的 t2 架构没有
after
字段,因此您需要运行两个单独的连接器
该字段还需要出现在所有记录中,否则分区器将无法工作。