Azure Functions - KafkaTrigger - 使用 avro 在第二级反序列化复合对象返回空值

问题描述 投票:0回答:1

我正在使用 KafkaTrigger 创建 Azure Functions,以使用来自 Kafka 的数据,其中注册了 Avro 架构。当我使用 dataType = "binary" 时,我可以反序列化消息,但我无法获取消息键,只能获取值。当我使用 avroSchema = myAvro 时,我可以获得包含密钥的对象,但数据采用格式错误的字符串格式。

我需要反序列化复杂对象。非常适合使用架构注册表。

@KafkaTrigger(
    name = "kafkaMeetingTrigger",
    topic = "topicName",
    brokerList = "%BrokerList%",
    cardinality = Cardinality.ONE,
    sslCertificateLocation = "%KafkaCert%",
    sslKeyLocation = "%KafkaCert%",
    sslCaLocation = "%KafkaCert%",
    protocol = BrokerProtocol.SSL,
    dataType = "binary",
    schemaRegistryUrl = "%SchemaRegistry%",
    consumerGroup="%ConsumerGroup%") byte[] kafkaEventData,

此返回带有来自 Kafka 的值的字节数组。然后我可以反序列化到我的对象,但密钥丢失了。

@KafkaTrigger(
    name = "kafkaMeetingTrigger",
    topic = "topicName",
    brokerList = "%BrokerList%",
    cardinality = Cardinality.ONE,
    sslCertificateLocation = "%KafkaCert%",
    sslKeyLocation = "%KafkaCert%",
    sslCaLocation = "%KafkaCert%",
    protocol = BrokerProtocol.SSL,
    avroSchema = avro,
    consumerGroup="%ConsumerGroup%") Meeting kafkaEventData

此返回不完整的对象。

{
"correlationId": "iaymjxnlpnocbe",
"meeting":
{
    "assistanceType": null,
    "attendanceType": null,
    "clients": null,
    "communicationType": null
},
"published": "2023-11-24T11:31:05.105Z",
"transactionId": "ipdsrntgklkvliyt"

}

avro 示例:

{
  "type": "record",
  "name": "Meeting_keyDataChanged",
  "namespace": "cz.csas.avroschemas.meeting_keydatachanged.v03_02",
  "fields": [
    {
      "name": "correlationId",
      "type": "string"
    },
    {
      "name": "meeting",
      "type": {
        "type": "record",
        "name": "Meeting",
        "fields": [
          {
            "name": "assistanceType",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "attendanceType",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "clients",
            "type": [
              "null",
              {
                "type": "array",
                "items": {
                  "type": "record",
                  "name": "Client",
                  "fields": [
                    {
                      "name": "clientType",
                      "type": "string"
                    },
                    {
                      "name": "isPrimary",
                      "type": "boolean"
                    }
                  ]
                }
              }
            ],
            "default": null
          },
       .........
    {
      "name": "published",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    },
    {
      "name": "transactionId",
      "type": "string"
    }
  ]
}
@KafkaTrigger(
   name = "kafkaMeetingTrigger",
   topic = "EVT.Meeting_keyDataChanged_v03_02",
   brokerList = "%BrokerList%",
   cardinality = Cardinality.ONE,
   sslCertificateLocation = "%KafkaCert%",
   sslKeyLocation = "%KafkaCert%",
   sslCaLocation = "%KafkaCert%",
   protocol = BrokerProtocol.SSL,
   avroSchema = avro,
   schemaRegistryUrl = "%SchemaRegistry%",
   consumerGroup="%ConsumerGroup%") ConsumerRecord<Object, Meeting> kafkaEventData

此返回异常:无法评估 org.apache.kafka.clients.consumer.ConsumerRecord.toString()

@KafkaTrigger(
    name = "kafkaMeetingTrigger",
    topic = "EVT.Meeting_keyDataChanged_v03_02",
    brokerList = "%BrokerList%",
    cardinality = Cardinality.ONE,
    sslCertificateLocation = "%KafkaCert%",
    sslKeyLocation = "%KafkaCert%",
    sslCaLocation = "%KafkaCert%",
    protocol = BrokerProtocol.SSL,
    dataType = "string",
    avroSchema = avro,
    schemaRegistryUrl = "%SchemaRegistry%",
    consumerGroup="%ConsumerGroup%") String kafkaEventData

此返回字符串仅包含第一级的数据。下一级仅包含架构。

{"correlationId":"iaymjxnlpnocbe","meeting":{"Schema":{"Fields":[{"Name":"assistanceType","aliases":null,"Aliases":null,"Pos":0,"Documentation":"Typ asistence na schuzce.","DefaultValue":null,"Ordering":2,"Schema":{"Schemas":[{"Name":"null","Tag":0,"Fullname":"null"},{"Name":"string","Tag":7,"Fullname":"string"}],"Count":2,"Name":"union","Tag":12,"Fullname":"union"}},{"Name":"attendanceType","aliases":null,"Aliases":null,"Pos":1,"Documentation":"Typ doprovodu na schuzce.","DefaultValue":null,"Ordering":2,"Schema":{"Schemas":[{"Name":"null","Tag":0,"Fullname":"null"},{"Name":"string","Tag":7,"Fullname":"string"}],"Count":2,"Name":"union","Tag":12,"Fullname":"union"}},{"Name":"clients","aliases":null,"Aliases":null,"Pos":2,"Documentation":null,"DefaultValue":null,"Ordering":2,"Schema":{"Schemas":[{"Name":"null","Tag":0,......

当我不使用 avro 时,我为整个对象提供键和偏移量,但对象无法反序列化。

@KafkaTrigger(
    name = "kafkaMeetingTrigger",
    topic = "EVT.Meeting_keyDataChanged_v03_02",
    brokerList = "%BrokerList%",
    cardinality = Cardinality.ONE,
    sslCertificateLocation = "%KafkaCert%",
    sslKeyLocation = "%KafkaCert%",
    sslCaLocation = "%KafkaCert%",
    protocol = BrokerProtocol.SSL,
    dataType = "string",
    schemaRegistryUrl = "%SchemaRegistry%",
    consumerGroup="%ConsumerGroup%") String kafkaEventData

结果示例

{
    "Offset": 323,
    "Partition": 0,
    "Topic": "EVT.Meeting_keyDataChanged_v03_02",
    "Timestamp": "2023-11-27T07:46:12.435Z",
    "Value": "\u0000\u0000\u0000\bE\u001ciaymjxnlpnocbe\u0002\u0016kbsdkmeyalo\u00022urhywvxlrixyuqhhmrioqvpxv\u0002\u0002<mcxbasjpxwfbjmogfdlmyucmtkiffpHnmysbemqgyeqnelyhjboneshwjufsulyfpfv\u0001\u0000\u0002.tslhjfloiepkmuneymnqbjc\u0002(fxjlobrtqoaymkfwurqm\u0002\brxsy��ȑ�c\u0000\u0002Bsqnsoblhpfkobciourcjtpwxrgqnvdtrq\u0002\u001ewwntnicjjvcsglu\u0002&sdpdptdflhjlkrhxlls\u00020msrqlxpjbdkkgbwsuslaeeql\u0002<nvttxsgwwlmgvtbhflbmdvtfceasvm\u0002Nakybxmgwxlapymnfymqytjwdqcwfipivsnmjpmi\u0002\u0002\bselt\bpmep\u0002:gcpxjxwrqtceoysrirqvdnllmhcdv\u0002\u001accbfdnjyrbtxi\u0004tr��ȑ�cBxouriawfhfevlthfcfikpuaarjxfmmwkl\u0002.gvoiukbrporvptduebdvucb\u0002\u0006xsp\b\u0002\u0002(mfiudurtxmiejnynnjis\u0002@qcohlkrkprlkuydljhpkilaxoxsvrrbg\u0002\u001anptiwyholnpfr\u0001@usyyfhyjfoyoaulsvnyjnxcqicfscbkl\u0002\u0002Hsgduxxraimgjsduecbgyvypeupuhmvuqcwkc\u0002\u001ehxhupdrenujqcrx\u00028khjhnbyjeihfkfrmkuuyepvxdhbv\u00016vijyhthrgatcjxqbimfslksnrti\u0002\u0002Hrbudwdqichgdgwwgnscljauphjclxbrjpmoe\u00024jxkknmcltpitgocsfdcrwjuvoc\u0002\bklpd\u0001Jkuwvteymnagdcjvtjwlnvktwjfqbncgpxbhnh\u0002\u0002Jjgsfdxrsujpbkdbewkuudtsdajvmexniuwpxu\u0002Fgerujewxkcnmexffsujlyibkbidnjxyiedr\u0002\u0000\u0001.bdfyrklhntdbknmuxvrgwjc\u0000��ȑ�c ipdsrntgklkvliyt",
    "Key": null,
    "Headers":
    []
}
java apache-kafka azure-functions avro
1个回答
0
投票

根据文档,如果您使用字符串,那么您将始终以 json 形式获取整个记录,并且必须先使用 json 手动反序列化,然后才能处理其他任何内容。使用 avro 数据类型时,您只能提供值的架构,而不能提供键(假设它也是 Avro)。

您可以在此处找到获取记录标头的示例,并且您必须执行类似的操作才能获取密钥

https://github.com/Azure/azure-functions-kafka-extension/blob/dev/samples/java/confluence/src/main/java/com/contoso/kafka/KafkaTriggerWithHeaders.java

© www.soinside.com 2019 - 2024. All rights reserved.