我正在使用 KafkaTrigger 创建 Azure Functions,以使用来自 Kafka 的数据,其中注册了 Avro 架构。当我使用 dataType = "binary" 时,我可以反序列化消息,但我无法获取消息键,只能获取值。当我使用 avroSchema = myAvro 时,我可以获得包含密钥的对象,但数据采用格式错误的字符串格式。
我需要反序列化复杂对象。非常适合使用架构注册表。
@KafkaTrigger(
name = "kafkaMeetingTrigger",
topic = "topicName",
brokerList = "%BrokerList%",
cardinality = Cardinality.ONE,
sslCertificateLocation = "%KafkaCert%",
sslKeyLocation = "%KafkaCert%",
sslCaLocation = "%KafkaCert%",
protocol = BrokerProtocol.SSL,
dataType = "binary",
schemaRegistryUrl = "%SchemaRegistry%",
consumerGroup="%ConsumerGroup%") byte[] kafkaEventData,
此返回带有来自 Kafka 的值的字节数组。然后我可以反序列化到我的对象,但密钥丢失了。
@KafkaTrigger(
name = "kafkaMeetingTrigger",
topic = "topicName",
brokerList = "%BrokerList%",
cardinality = Cardinality.ONE,
sslCertificateLocation = "%KafkaCert%",
sslKeyLocation = "%KafkaCert%",
sslCaLocation = "%KafkaCert%",
protocol = BrokerProtocol.SSL,
avroSchema = avro,
consumerGroup="%ConsumerGroup%") Meeting kafkaEventData
此返回不完整的对象。
{
"correlationId": "iaymjxnlpnocbe",
"meeting":
{
"assistanceType": null,
"attendanceType": null,
"clients": null,
"communicationType": null
},
"published": "2023-11-24T11:31:05.105Z",
"transactionId": "ipdsrntgklkvliyt"
}
avro 示例:
{
"type": "record",
"name": "Meeting_keyDataChanged",
"namespace": "cz.csas.avroschemas.meeting_keydatachanged.v03_02",
"fields": [
{
"name": "correlationId",
"type": "string"
},
{
"name": "meeting",
"type": {
"type": "record",
"name": "Meeting",
"fields": [
{
"name": "assistanceType",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "attendanceType",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "clients",
"type": [
"null",
{
"type": "array",
"items": {
"type": "record",
"name": "Client",
"fields": [
{
"name": "clientType",
"type": "string"
},
{
"name": "isPrimary",
"type": "boolean"
}
]
}
}
],
"default": null
},
.........
{
"name": "published",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "transactionId",
"type": "string"
}
]
}
@KafkaTrigger(
name = "kafkaMeetingTrigger",
topic = "EVT.Meeting_keyDataChanged_v03_02",
brokerList = "%BrokerList%",
cardinality = Cardinality.ONE,
sslCertificateLocation = "%KafkaCert%",
sslKeyLocation = "%KafkaCert%",
sslCaLocation = "%KafkaCert%",
protocol = BrokerProtocol.SSL,
avroSchema = avro,
schemaRegistryUrl = "%SchemaRegistry%",
consumerGroup="%ConsumerGroup%") ConsumerRecord<Object, Meeting> kafkaEventData
此返回异常:无法评估 org.apache.kafka.clients.consumer.ConsumerRecord.toString()
@KafkaTrigger(
name = "kafkaMeetingTrigger",
topic = "EVT.Meeting_keyDataChanged_v03_02",
brokerList = "%BrokerList%",
cardinality = Cardinality.ONE,
sslCertificateLocation = "%KafkaCert%",
sslKeyLocation = "%KafkaCert%",
sslCaLocation = "%KafkaCert%",
protocol = BrokerProtocol.SSL,
dataType = "string",
avroSchema = avro,
schemaRegistryUrl = "%SchemaRegistry%",
consumerGroup="%ConsumerGroup%") String kafkaEventData
此返回字符串仅包含第一级的数据。下一级仅包含架构。
{"correlationId":"iaymjxnlpnocbe","meeting":{"Schema":{"Fields":[{"Name":"assistanceType","aliases":null,"Aliases":null,"Pos":0,"Documentation":"Typ asistence na schuzce.","DefaultValue":null,"Ordering":2,"Schema":{"Schemas":[{"Name":"null","Tag":0,"Fullname":"null"},{"Name":"string","Tag":7,"Fullname":"string"}],"Count":2,"Name":"union","Tag":12,"Fullname":"union"}},{"Name":"attendanceType","aliases":null,"Aliases":null,"Pos":1,"Documentation":"Typ doprovodu na schuzce.","DefaultValue":null,"Ordering":2,"Schema":{"Schemas":[{"Name":"null","Tag":0,"Fullname":"null"},{"Name":"string","Tag":7,"Fullname":"string"}],"Count":2,"Name":"union","Tag":12,"Fullname":"union"}},{"Name":"clients","aliases":null,"Aliases":null,"Pos":2,"Documentation":null,"DefaultValue":null,"Ordering":2,"Schema":{"Schemas":[{"Name":"null","Tag":0,......
当我不使用 avro 时,我为整个对象提供键和偏移量,但对象无法反序列化。
@KafkaTrigger(
name = "kafkaMeetingTrigger",
topic = "EVT.Meeting_keyDataChanged_v03_02",
brokerList = "%BrokerList%",
cardinality = Cardinality.ONE,
sslCertificateLocation = "%KafkaCert%",
sslKeyLocation = "%KafkaCert%",
sslCaLocation = "%KafkaCert%",
protocol = BrokerProtocol.SSL,
dataType = "string",
schemaRegistryUrl = "%SchemaRegistry%",
consumerGroup="%ConsumerGroup%") String kafkaEventData
结果示例
{
"Offset": 323,
"Partition": 0,
"Topic": "EVT.Meeting_keyDataChanged_v03_02",
"Timestamp": "2023-11-27T07:46:12.435Z",
"Value": "\u0000\u0000\u0000\bE\u001ciaymjxnlpnocbe\u0002\u0016kbsdkmeyalo\u00022urhywvxlrixyuqhhmrioqvpxv\u0002\u0002<mcxbasjpxwfbjmogfdlmyucmtkiffpHnmysbemqgyeqnelyhjboneshwjufsulyfpfv\u0001\u0000\u0002.tslhjfloiepkmuneymnqbjc\u0002(fxjlobrtqoaymkfwurqm\u0002\brxsy��ȑ�c\u0000\u0002Bsqnsoblhpfkobciourcjtpwxrgqnvdtrq\u0002\u001ewwntnicjjvcsglu\u0002&sdpdptdflhjlkrhxlls\u00020msrqlxpjbdkkgbwsuslaeeql\u0002<nvttxsgwwlmgvtbhflbmdvtfceasvm\u0002Nakybxmgwxlapymnfymqytjwdqcwfipivsnmjpmi\u0002\u0002\bselt\bpmep\u0002:gcpxjxwrqtceoysrirqvdnllmhcdv\u0002\u001accbfdnjyrbtxi\u0004tr��ȑ�cBxouriawfhfevlthfcfikpuaarjxfmmwkl\u0002.gvoiukbrporvptduebdvucb\u0002\u0006xsp\b\u0002\u0002(mfiudurtxmiejnynnjis\u0002@qcohlkrkprlkuydljhpkilaxoxsvrrbg\u0002\u001anptiwyholnpfr\u0001@usyyfhyjfoyoaulsvnyjnxcqicfscbkl\u0002\u0002Hsgduxxraimgjsduecbgyvypeupuhmvuqcwkc\u0002\u001ehxhupdrenujqcrx\u00028khjhnbyjeihfkfrmkuuyepvxdhbv\u00016vijyhthrgatcjxqbimfslksnrti\u0002\u0002Hrbudwdqichgdgwwgnscljauphjclxbrjpmoe\u00024jxkknmcltpitgocsfdcrwjuvoc\u0002\bklpd\u0001Jkuwvteymnagdcjvtjwlnvktwjfqbncgpxbhnh\u0002\u0002Jjgsfdxrsujpbkdbewkuudtsdajvmexniuwpxu\u0002Fgerujewxkcnmexffsujlyibkbidnjxyiedr\u0002\u0000\u0001.bdfyrklhntdbknmuxvrgwjc\u0000��ȑ�c ipdsrntgklkvliyt",
"Key": null,
"Headers":
[]
}
根据文档,如果您使用字符串,那么您将始终以 json 形式获取整个记录,并且必须先使用 json 手动反序列化,然后才能处理其他任何内容。使用 avro 数据类型时,您只能提供值的架构,而不能提供键(假设它也是 Avro)。
您可以在此处找到获取记录标头的示例,并且您必须执行类似的操作才能获取密钥