PoJo 到 Avro 序列化抛出 KryoException:java.lang.UnsupportedOperationException

问题描述 投票:0回答:1

我的单元测试在带有 parquet-avro 1.10.0 的 Flink 1.11.2 下运行正常,一旦我用 parquet-avro 1.12.0 升级到 1.12.0,我的单元测试将抛出

com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[kryo-2.24.0.jar:?]
...

aused by: java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057) ~[?:1.8.0_282]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[kryo-2.24.0.jar:?]
    ... 27 more

我的单元测试代码片段如下所示:

private ImmutableList<PoJo> testData = ImmutableList.of(
            PoJo.build("123", "0.0.0.0", null),
            PoJo.build("123", "0.0.0.1", 2L)
    );

DataStream<PoJo> input = env
                .addSource(new TestSource(testData), PojoTypeInfo.of(PoJo.class))
                .assignTimestampsAndWatermarks(watermarkStrategy);

DataStream<GenericRecord> output = input
                .map(TestClass::convertPoJoToGenericRecord)
                .returns(new GenericRecordAvroTypeInfo(PoJo.getAvroSchema()));

output.addSink();

功能类似于

GenericRecord convertPoJoToGenericRecord(PoJo pojo) throws Exception {
        Schema schema = PoJo.getAvroSchema();
        GenericRecordBuilder builder = new GenericRecordBuilder(schema);
        for (Schema.Field field : schema.getFields()) {
            builder.set(field.name(), TestClass.getObjectField(field, pojo));
        }
        GenericRecord record = builder.build();
        return record;
    }

有人可以帮忙吗?

谢谢你。

apache-flink avro flink-streaming
1个回答
0
投票

这听起来像是一个有效的答案,并且可能会有所帮助:

https://lists.apache.org/thread/8f21loz5915dzw8cy2q8c08kxypvj1sq

我建议使用 AvroSerializer 来序列化 GenericRecords。 您必须将 org.apache.flink:flink-avro 添加为您的作业的依赖项,并且 然后告诉系统您想要使用 GenericRecordAvroTypeInfo 通过

DataStream sourceStream = env.addSource(new AvroGenericSource()) .returns(new GenericRecordAvroTypeInfo(schema));

您可以在此处找到更多相关信息 [1]。

[1] https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro

© www.soinside.com 2019 - 2024. All rights reserved.