我不知道我是否在这里徒劳无功,并试图做一些完全毫无意义的事情,但我想我还是会问,因为我已经花了足够长的时间把头撞到砖墙上.
我正在使用 Kafka connect 将数据从数据库拉入 Kafka 主题。我想使用表中的
ID
字段作为消息键,并具有以下内容:
transforms.valueToKey.type: org.apache.kafka.connect.transforms.ValueToKey
transforms.valueToKey.fields: ID
transforms.extractId.type: org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractId.field: ID
这可以很好地在输入数据时从数据中提取 ID;但是,是否可以将该密钥存储为整数?
我想这样做,以便我可以使用
org.apache.kafka.common.serialization.Serdes.Integer()
反序列化 Kafka Streams 中的密钥。
我已经尝试过:
key.converter: org.apache.kafka.connect.storage.StringConverter
但是随后流抱怨了
Size of data received by IntegerDeserializer is not 4
我已经尝试过:
key.converter: org.apache.kafka.connect.converters.JsonConvertor
key.converter.schemas.enable: false
key.converter.type.specific: INT32
同样的错误。
我已经尝试过:
key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
但是 Kafka connect 抱怨
Invalid schema type for ByteArrayConverter: INT32
(这表明数据是 INT32 格式 - 但我如何保存它??)。
我已经尝试过:
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: .....
Kafka 连接抱怨
Error registering Avro schema"int"
。
我尝试过不指定转换器,但它似乎会回退到默认的 JSON 转换器,并且我找不到方法来指定“根本不使用转换器,只需以当前格式保存值”。
我可以将其保留为字符串,然后在流应用程序中将其转换为整数,但事实是有一个
Integer
serdes 对我来说,我应该能够将其作为整数开始,但是也许这是误导。
另一种选择可能是编写我自己的转换器,但感觉这是一个标准的事情,应该已经有一种方法可以做到这一点 - 除非我试图做一些愚蠢/毫无意义/不可能/等等的事情。
如果数据库表中的 ID 是 INT32,请尝试以下操作:
key.converter:org.apache.kafka.connect.converters.IntegerConverter
transforms.valueToKey.type:org.apache.kafka.connect.transforms.ValueToKey
transforms.valueToKey.fields:ID
transforms.valueToKey.field.schemas: INT32 # 指定ID字段的模式
transforms.extractId.type:org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractId.field:ID
如果您的数据库表中的 ID 不是整数,但您希望它在 Kafka 主题中表示为整数,请尝试以下操作:
transforms.valueToKey.type:org.apache.kafka.connect.transforms.ValueToKey
transforms.valueToKey.fields:ID
transforms.valueToKey.field.schemas:INT32
transforms.valueToKey.delimiter:-
transforms.valueToKey.key.field:ID
transforms.valueToKey.key.type:int