Kafka Connect S3 Sink 添加元数据

问题描述 投票:0回答:2

我正在尝试将元数据添加到 kafka 的输出到 S3 存储桶中。

目前,输出只是来自 kafka 主题的消息的值。

我想用以下(元数据)来包装它:

topic, timestamp, partition, offset, key, value
示例:

{
    "topic":"some-topic",
    "timestamp":"some-timestamp",
    "partition":"some-partition",
    "offset":"some-offset",
    "key":"some-key",
    "value":"the-orig-value"
}

注意:当我获取它时,会抛出一个消费者,它会获取所有元数据。如我所愿。

我的连接器配置:

{  
 "name" : "test_s3_sink",   
 "config" : {     
     "connector.class" : "io.confluent.connect.s3.S3SinkConnector",
     "errors.log.enable" : "true",     
     "errors.log.include.messages" : "true",
     "flush.size" : "10000",
     "format.class" : "io.confluent.connect.s3.format.json.JsonFormat",
     "name" : "test_s3_sink",
     "rotate.interval.ms" : "60000",
     "s3.bucket.name" : "some-bucket-name",
     "storage.class" : "io.confluent.connect.s3.storage.S3Storage",
     "topics" : "some.topic",
     "topics.dir" : "some-dir"
   }
 }

谢谢。

amazon-s3 apache-kafka apache-kafka-connect s3-kafka-connector
2个回答
0
投票

目前,输出只是来自 kafka 主题的消息的值。

正确,这是记录的行为。如果您也需要的话,有一个设置可以包含您丢失的关键数据,但没有设置可以获取其余数据。

对于记录时间戳,您可以编辑生产者代码以将其添加为记录的一部分。 (以及其他所有内容,就这一点而言,如果您每次生成时都能够查询主题的下一个偏移量)

对于主题和分区,它们是 S3 文件的一部分,因此无论您使用什么读取文件,都应该能够解析出该信息;偏移量值也是文件名的一部分,然后添加文件中的行号以获得记录的(近似)偏移量。


或者,您可以使用 Connect 转换(例如此存档转换),将 Kafka 记录元数据(偏移量和分区除外)全部重新定位到 Connect Struct 值中,以便接收器连接器将其全部写入文件

https://github.com/jcustenborder/kafka-connect-transform-archive


无论哪种方式,ConnectRecord 都没有偏移字段

SinkRecord
有,但我认为 API 中的转换访问它为时已晚


0
投票

最新版本的Lenses S3 Connector允许您输出包括标头在内的元数据。

数据为S3会出现这样的情况:

{
  "key": <the message Key, which can be a primitive or a complex object>,
  "value": <the message Key, which can be a primitive or a complex object>,
  "headers": {
    "header1": "value1",
    "header2": "value2"
  },
  "metadata": {
    "offset": 821122,
    "partition": 3,
    "timestamp": 1695645345,
    "topic": "some.topic"
  }
}

连接配置:

    connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
connect.s3.kcql=INSERT INTO myTargetS3Bucket:somePrefix SELECT * FROM ’some.topic’ STOREAS AVRO PROPERTIES (‘store.envelope’=true)
    topics=some.topic
    name= test_s3_sink
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter=org.apache.kafka.connect.storage.StringConverter

这里解释一下: https://lenses.io/blog/2023/09/open-source-lenses-kafka-s3-connector/

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.