KafkaAvroDeserializer因Kyro异常而失败

问题描述 投票:1回答:1

我已经写了一个使用者来使用架构注册表读取Avro的通用记录。

FlinkKafkaConsumer010 kafkaConsumer010 = new FlinkKafkaConsumer010(KAFKA_TOPICS,
                new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
                properties);

反序列化类如下所示:

public class KafkaGenericAvroDeserializationSchema implements KeyedDeserializationSchema<GenericRecord> {

   private final String registryUrl;
    private transient KafkaAvroDeserializer inner;

    public KafkaGenericAvroDeserializationSchema(String registryUrl) {
        this.registryUrl = registryUrl;
    }

    @Override
    public GenericRecord deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) {
        checkInitialized();
        return (GenericRecord) inner.deserialize(topic, message);
    }

    @Override
    public boolean isEndOfStream(GenericRecord nextElement) {
        return false;
    }

    @Override
    public TypeInformation<GenericRecord> getProducedType() {
        return TypeExtractor.getForClass(GenericRecord.class);
    }

    private void checkInitialized() {
        if (inner == null) {
            Map<String, Object> props = new HashMap<>();
            props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
            props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
            SchemaRegistryClient client =
                    new CachedSchemaRegistryClient(
                            registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
            inner = new KafkaAvroDeserializer(client, props);
        }
    }
}

它在我的机器上本地工作,但是当我将其部署到纱线簇上时,我遇到了以下异常:

java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)

Caused by: com.esotericsoftware.kryo.KryoException: Error constructing instance of class: org.apache.avro.Schema$LockableArrayList
Serialization trace:
types (org.apache.avro.Schema$UnionSchema)
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)

Caused by: java.lang.IllegalAccessException: Class com.twitter.chill.Instantiators$$anonfun$normalJava$1 can not access a member of class org.apache.avro.Schema$LockableArrayList with modifiers "public"

[请帮助我解决此问题。

java apache-kafka apache-flink avro flink-streaming
1个回答
0
投票

写在mailing list上:

问题是您没有提供任何有意义的类型信息,因此Flink必须诉诸Kryo。您需要在查询编译期间(在您的主目录中)提取模式,并将其传递给反序列化模式。

public TypeInformation<T> getProducedType() {
      return (TypeInformation<T>) new GenericRecordAvroTypeInfo(this.schema);
}

如果您不想静态提取它,则需要告诉Flink如何处理任意GenericRecords。您可以实现自己的serializer,它将GenericRecords写入byte [],反之亦然。

请注意,我仍然建议仅将架构与Flink应用程序捆绑在一起,而不要重新发明轮子。

© www.soinside.com 2019 - 2024. All rights reserved.