我有 2 个应用程序,一个正在向主题生成数据,另一个应用程序从主题接收数据。写入器模式版本与读取器(消费者)模式不同。生产者具有新模式,而消费者具有旧模式。使用模式注册表模式演化选项,我认为消费者可以成功工作,但出现以下错误。看起来消费者正在查看特定记录中接收字段的排序,而不是忽略未知字段。
来自 Avro 文档:
如果两者都是记录:
- 字段的顺序可能不同:字段匹配 名字。
- 两条记录中具有相同名称的字段的模式是 递归解决。
- 如果作者的记录包含读者记录中不存在的名称的字段,则该字段的作者值将被忽略。
Caused by: java.lang.IndexOutOfBoundsException: Invalid index: 2
at com.test.Employee.put(Employee.java:111) ~[classes!/:0.0.1-SNAPSHOT]
at org.apache.avro.generic.GenericData.setField(GenericData.java:816) ~[avro-1.9.2.jar!/:1.9.2]
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139) ~[avro-1.9.2.jar!/:1.9.2]
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) ~[avro-1.9.2.jar!/:1.9.2]
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.9.2.jar!/:1.9.2]
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) ~[avro-1.9.2.jar!/:1.9.2]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) ~[avro-1.9.2.jar!/:1.9.2]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) ~[avro-1.9.2.jar!/:1.9.2]
生产者架构:
{
"type": "record",
"namespace": "com.aa.opshub.test",
"name": "Employee",
"fields": [
{
"name": "firstName",
"type": "string"
},
{
"name": "age",
"type": "int"
},
{
"name": "middleName",
"type": "string"
}
]
}
消费者模式:
{
"type" : "record",
"namespace" : "com.aa.opshub.test",
"name" : "Employee",
"fields" : [
{ "name" : "firstName" , "type" : "string" },
{ "name" : "age", "type" : "int" }
]
}
消费者:SpecificRecord类方法抛出错误
@SuppressWarnings(value="unchecked")
public void put(int field$, java.lang.Object value$) {
switch (field$) {
case 0: firstName = (java.lang.CharSequence)value$; break;
case 1: age = (java.lang.Integer)value$; break;
default: throw new IndexOutOfBoundsException("Invalid index: " + field$);
}
}
我在使用 avro 进行通信的客户端和服务器上遇到了同样的问题。就我而言,修复方法是在读取从客户端发送的记录时使用服务器端模式,该记录显然与我的服务器处于不同的模式。我必须改变初始化阅读器的方式。我保留了以下代码,
DatumReader<GenericRecord> reader;
if(serverSchema != null) {
reader = new GenericDatumReader<>(serverSchema);
} else {
reader = new GenericDatumReader<>();
LOGGER.warn("Schema version mismatch can cause execution failures. Always pass expected server schema.");
}