org.apache.kafka.connect.errors.DataException:数组默认值的JSON无效:“null”

问题描述 投票:0回答:2

我正在尝试使用confluent-4.1.1来使用汇合的Kafka s3连接器。

S3汇

"value.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"

当我为s3接收器运行Kafka连接器时,我收到以下错误消息:

ERROR WorkerSinkTask{id=singular-s3-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.DataException: Invalid JSON for array default value: "null"
        at io.confluent.connect.avro.AvroData.defaultValueFromAvro(AvroData.java:1649)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1562)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1443)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1443)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1323)
        at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1047)
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:87)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

我的架构只包含一个数组类型字段,其架构是这样的

{"name":"item_id","type":{"type":"array","items":["null","string"]},"default":[]}

我可以使用kafka-avro-console-consumer命令查看反序列化的消息。我见过一个similar question,但在他的情况下,他也使用Avro序列化器作为键。

./confluent-4.1.1/bin/kafka-avro-console-consumer --topic singular_custom_postback --bootstrap-server localhost:9092  -max-messages 2

"item_id":[{"string":"15552"},{"string":"37810"},{"string":"38061"}]
"item_id":[]

我无法将我从控制台使用者那里得到的整个输出都包含在内,因为它包含敏感的用户信息,因此我在模式中添加了唯一的数组类型字段。

提前致谢。

apache-kafka avro apache-kafka-connect confluent confluent-schema-registry
2个回答
0
投票

调用io.confluent.connect.avro.AvroData.defaultValueFromAvro(AvroData.java:1649)将您读取的消息的avro架构转换为connect sink的内部架构。我认为它与您的消息数据无关。这就是为什么AbstractKafkaAvroDeserializer可以成功地反序列化您的消息(例如通过kafka-avro-console-consumer),因为您的消息是有效的avro消息。如果您的默认值为null,则可能会出现上述异常,而null不是您的字段的有效值。例如。

{
   "name":"item_id",
   "type":{
      "type":"array",
      "items":[
         "string"
      ]
   },
   "default": null
}

我建议你远程调试连接,看看究竟是什么失败了。


0
投票

与您链接的问题相同的问题。

In the source code,你可以看到这种情况。

  case ARRAY: {
    if (!jsonValue.isArray()) {
      throw new DataException("Invalid JSON for array default value: " + jsonValue.toString());
    }

如果在您的情况下将模式类型定义为type:"array",则可以抛出异常,但有效负载本身具有null值(或任何其他值类型),而不是实际上是数组,尽管您已将其定义为模式缺省值。默认仅在items元素根本不存在时应用,而不是在"items":null时应用


除此之外,我建议像这样的模式,即一个记录对象,而不仅仅是一个命名数组,默认为空数组,而不是null

{
  "type" : "record",
  "name" : "Items",
  "namespace" : "com.example.avro",
  "fields" : [ {
    "name" : "item_id",
    "type" : {
      "type" : "array",
      "items" : [ "null", "string" ]
    },
    "default": []
  } ]
}
© www.soinside.com 2019 - 2024. All rights reserved.