我可以使用单个 s3-sink 连接器通过对不同主题使用不同类型的 Avro 架构来为时间戳字段指向相同的字段名称吗?

问题描述 投票:0回答:1

主题 t1 的架构

{
  "type": "record",
  "name": "Envelope",
  "namespace": "t1",
  "fields": [
    {
      "name": "before",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Value",
          "fields": [
            {
              "name": "id",
              "type": {
                "type": "long",
                "connect.default": 0
              },
              "default": 0
            },
            {
              "name": "createdAt",
              "type": [
                "null",
                {
                  "type": "string",
                  "connect.version": 1,
                  "connect.name": "io.debezium.time.ZonedTimestamp"
                }
              ],
              "default": null
            },
           
          ],
          "connect.name": "t1.Value"
        }
      ],
      "default": null
    },
    {
      "name": "after",
      "type": [
        "null",
        "Value"
      ],
      "default": null
    }
   
  ],
  "connect.name": "t1.Envelope"
}

主题 t2 的架构

{
    "type": "record",
    "name": "Value",
    "namespace": "t2",
    "fields": [
          {
              "name": "id",
              "type": {
                "type": "long",
                "connect.default": 0
              },
              "default": 0
          },
          {
            "name": "createdAt",
            "type": [
                "null",
                {
                    "type": "string",
                    "connect.version": 1,
                    "connect.name": "io.debezium.time.ZonedTimestamp"
                }
            ],
            "default": null
          }
    ],
    "connect.name": "t2.Value"
}

s3-sink 连接器配置

connector.class=io.confluent.connect.s3.S3SinkConnector
behavior.on.null.values=ignore
s3.region=us-west-2
partition.duration.ms=1000
flush.size=1
tasks.max=3
timezone=UTC
topics.regex=t1,t2
aws.secret.access.key=******
locale=US
format.class=io.confluent.connect.s3.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
value.converter.schemas.enable=false
name=s3-sink-connector
aws.access.key.id=******
errors.tolerance=all
value.converter=org.apache.kafka.connect.json.JsonConverter
storage.class=io.confluent.connect.s3.storage.S3Storage
key.converter=org.apache.kafka.connect.storage.StringConverter
s3.bucket.name=s3-sink-connector-bucket
path.format=YYYY/MM/dd
timestamp.extractor=RecordField
timestamp.field=after.createdAt

通过使用此连接器配置,我收到 t2 topic 的错误,即 “createdAt 字段不存在”。 如果我设置 timestamp.field =createdAt 则会抛出错误 t1 主题“createdAt 字段不存在”

如何通过对两个模式使用相同的连接器来同时指向两个模式中的 “createdAt” 字段?

是否可以通过使用单个 s3-sink 连接器配置来实现此目的?

如果这种情况是可能的,那么我该如何做到这一点,我必须使用哪些属性来实现这一目标?

如果有人对此有想法,请提出建议。 如果有任何其他方法可以做到这一点,请也建议这种方式。

apache-kafka apache-kafka-connect s3-kafka-connector
1个回答
2
投票

所有主题都需要相同的时间戳字段;无法配置主题到字段的映射。

您的 t2 架构没有

after
字段,因此您需要运行两个单独的连接器

该字段还需要出现在所有记录中,否则分区器将无法工作。

© www.soinside.com 2019 - 2024. All rights reserved.