我正在尝试使用confluent-4.1.1来使用汇合的Kafka s3连接器。
S3汇
"value.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
当我为s3接收器运行Kafka连接器时,我收到以下错误消息:
ERROR WorkerSinkTask{id=singular-s3-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.DataException: Invalid JSON for array default value: "null"
at io.confluent.connect.avro.AvroData.defaultValueFromAvro(AvroData.java:1649)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1562)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1443)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1443)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1323)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1047)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我的架构只包含一个数组类型字段,其架构是这样的
{"name":"item_id","type":{"type":"array","items":["null","string"]},"default":[]}
我可以使用kafka-avro-console-consumer命令查看反序列化的消息。我见过一个similar question,但在他的情况下,他也使用Avro序列化器作为键。
./confluent-4.1.1/bin/kafka-avro-console-consumer --topic singular_custom_postback --bootstrap-server localhost:9092 -max-messages 2
"item_id":[{"string":"15552"},{"string":"37810"},{"string":"38061"}]
"item_id":[]
我无法将我从控制台使用者那里得到的整个输出都包含在内,因为它包含敏感的用户信息,因此我在模式中添加了唯一的数组类型字段。
提前致谢。
调用io.confluent.connect.avro.AvroData.defaultValueFromAvro(AvroData.java:1649)
将您读取的消息的avro架构转换为connect sink的内部架构。我认为它与您的消息数据无关。这就是为什么AbstractKafkaAvroDeserializer
可以成功地反序列化您的消息(例如通过kafka-avro-console-consumer
),因为您的消息是有效的avro消息。如果您的默认值为null
,则可能会出现上述异常,而null
不是您的字段的有效值。例如。
{
"name":"item_id",
"type":{
"type":"array",
"items":[
"string"
]
},
"default": null
}
我建议你远程调试连接,看看究竟是什么失败了。
与您链接的问题相同的问题。
In the source code,你可以看到这种情况。
case ARRAY: {
if (!jsonValue.isArray()) {
throw new DataException("Invalid JSON for array default value: " + jsonValue.toString());
}
如果在您的情况下将模式类型定义为type:"array"
,则可以抛出异常,但有效负载本身具有null
值(或任何其他值类型),而不是实际上是数组,尽管您已将其定义为模式缺省值。默认仅在items
元素根本不存在时应用,而不是在"items":null
时应用
除此之外,我建议像这样的模式,即一个记录对象,而不仅仅是一个命名数组,默认为空数组,而不是null
。
{
"type" : "record",
"name" : "Items",
"namespace" : "com.example.avro",
"fields" : [ {
"name" : "item_id",
"type" : {
"type" : "array",
"items" : [ "null", "string" ]
},
"default": []
} ]
}