使用 Kafka Connect 生成整数 (INT32) 键

问题描述 投票:0回答:1

我不知道我是否在这里徒劳无功,并试图做一些完全毫无意义的事情,但我想我还是会问,因为我已经花了足够长的时间把头撞到砖墙上.

我正在使用 Kafka connect 将数据从数据库拉入 Kafka 主题。我想使用表中的

ID
字段作为消息键,并具有以下内容:

    transforms.valueToKey.type: org.apache.kafka.connect.transforms.ValueToKey
    transforms.valueToKey.fields: ID
    transforms.extractId.type: org.apache.kafka.connect.transforms.ExtractField$Key
    transforms.extractId.field: ID

这可以很好地在输入数据时从数据中提取 ID;但是,是否可以将该密钥存储为整数?

我想这样做,以便我可以使用

org.apache.kafka.common.serialization.Serdes.Integer()
反序列化 Kafka Streams 中的密钥。

我已经尝试过:

key.converter: org.apache.kafka.connect.storage.StringConverter

但是随后流抱怨了

Size of data received by IntegerDeserializer is not 4

我已经尝试过:

    key.converter: org.apache.kafka.connect.converters.JsonConvertor
    key.converter.schemas.enable: false
    key.converter.type.specific: INT32

同样的错误。

我已经尝试过:

    key.converter: org.apache.kafka.connect.converters.ByteArrayConverter

但是 Kafka connect 抱怨

Invalid schema type for ByteArrayConverter: INT32
(这表明数据是 INT32 格式 - 但我如何保存它??)。

我已经尝试过:

    value.converter: io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url: .....

Kafka 连接抱怨

Error registering Avro schema"int"

我尝试过不指定转换器,但它似乎会回退到默认的 JSON 转换器,并且我找不到方法来指定“根本不使用转换器,只需以当前格式保存值”。

我可以将其保留为字符串,然后在流应用程序中将其转换为整数,但事实是有一个

Integer
serdes 对我来说,我应该能够将其作为整数开始,但是也许这是误导。

另一种选择可能是编写我自己的转换器,但感觉这是一个标准的事情,应该已经有一种方法可以做到这一点 - 除非我试图做一些愚蠢/毫无意义/不可能/等等的事情。

apache-kafka apache-kafka-streams apache-kafka-connect
1个回答
0
投票

如果数据库表中的 ID 是 INT32,请尝试以下操作:

key.converter:org.apache.kafka.connect.converters.IntegerConverter

transforms.valueToKey.type:org.apache.kafka.connect.transforms.ValueToKey

transforms.valueToKey.fields:ID

transforms.valueToKey.field.schemas: INT32 # 指定ID字段的模式

transforms.extractId.type:org.apache.kafka.connect.transforms.ExtractField$Key

transforms.extractId.field:ID

如果您的数据库表中的 ID 不是整数,但您希望它在 Kafka 主题中表示为整数,请尝试以下操作:

transforms.valueToKey.type:org.apache.kafka.connect.transforms.ValueToKey

transforms.valueToKey.fields:ID

transforms.valueToKey.field.schemas:INT32

transforms.valueToKey.delimiter:-

transforms.valueToKey.key.field:ID

transforms.valueToKey.key.type:int

© www.soinside.com 2019 - 2024. All rights reserved.