我正在使用Avro架构将数据写入Kafka主题。最初,一切都很好。在avro文件中再添加一个新字段(scan_app_id)之后。我正面临这个错误。
Avro buddy:{
“type”:“record”,“name”:“Initiate_Scan”,“namespace”:“avro”,“doc”:“Initiate_Scan的Avro架构注册表”,“字段”:[{“name”:“app_id”, “type”:“string”,“doc”:“3位数应用程序ID”},
{ "name": "app_name", "type": "string", "doc": "application name" }, { "name": "dev_stage", "type": "string", "doc": "development stage" }, { "name": "scan_app_id", "type": "string", "doc": "unique scan id for an app in Veracode" }, { "name": "scan_name", "type": "string", "doc": "scan details" }, { "name": "seq_num", "type": "int", "doc": "unique number" }, { "name": "result_flg", "type": "string", "doc": "Y indicates results of scan available", "default": "Y" }, { "name": "request_id", "type": "int", "doc": "unique id" }, { "name": "scan_number", "type": "int", "doc": "number of scans" } ] }
错误:由以下原因引起:org.apache.kafka.common.errors.SerializationException:注册Avro架构时出错:{“type”:“record”,“name”:“Initiate_Scan”,“namespace”:“avro”,“doc” :“Initiate_Scan的Avro架构注册表”,“字段”:[{“name”:“app_id”,“type”:{“type”:“string”,“avro.java.string”:“String”},“ doc“:”3位应用程序ID“},{”name“:”app_name“,”type“:{”type“:”string“,”avro.java.string“:”String“},”doc“: “application name”},{“name”:“dev_stage”,“type”:{“type”:“string”,“avro.java.string”:“String”},“doc”:“development stage”} ,{“name”:“scan_app_id”,“type”:{“type”:“string”,“avro.java.string”:“String”},“doc”:“App的唯一扫描ID”}, {“name”:“scan_name”,“type”:{“type”:“string”,“avro.java.string”:“String”},“doc”:“scan details”},{“name”: “seq_num”,“type”:“int”,“doc”:“唯一编号”},{“name”:“result_flg”,“type”:{“type”:“string”,“avro.java.string “:”String“},”doc“:”Y表示可用扫描结果“,”默认“:”Y“},{”name“:”request_id“,”type“:”int“,”doc“: “unique id”},{“name”:“scan_number”,“type”:“int “,”doc“:”扫描次数“}]}
INFO使用timeoutMillis = 9223372036854775807 ms关闭Kafka生产者。 (org.apache.kafka.clients.producer.KafkaProducer:1017)引起:io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:注册操作超时;错误代码:50002 at io.confluent.kafka.chemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:182)at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:203) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:292)位于io.confluent的io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:284)位于io.confluent.kafka.schemaregistry.client的io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:61)中的.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:279) .cachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:93)at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72)at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54)at org.apache.kafka.common.serialization.Extende dSerializer $ Wrapper.serialize(ExtendedSerializer.java:65)org.apache.kafka.clrialization.ExtendedSerializer $ Wrapper.serialize(ExtendedSerializer.java:55)org.apache.kafka.clients.producer.KafkaProducer.doSend( KafkaProducer.java:768)org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:745)at com.ssc.svc.svds.initiate.InitiateProducer.initiateScanData(InitiateProducer.java:146)at com .ssc.svc.svds.initiate.InitiateProducer.topicsData(InitiateProducer.java:41)at com.ssc.svc.svds.initiate.InputData.main(InputData.java:31)
我说通过Confluent documentation大约50002错误
架构应与先前注册的架构兼容。
这是否意味着我无法更改/更新现有架构?
如何解决这个问题?
实际上,链接说50002 -- Operation timed out
。如果确实不相容,那么答案实际上就是这样说的。
在任何情况下,如果添加新字段,则需要定义default
值。
这样,任何使用较新模式定义的读取较旧消息的消费者都知道要为该字段设置的值。
A straight-forward list of allowed Avro changes I found is by Oracle
可能的错误是:
- 添加的字段没有默认值