如何制作Kryo序列化器

问题描述 投票:0回答:1

我试图通过提供Kryo序列化程序来解决以下问题,但仍然无法正常工作。它无法识别ModelCom的序列化器。此外,任何通过打印功能显示的消息都不会显示。

我使用了Apache Flink 1.9.0和Apache Jena 3.10.0

我在Kotlin中的代码:

val serializer = object : Serializer<Model>(){
            override fun write(kryo: Kryo, output: Output?, obj : Model?) {
                print("write")
                kryo.writeClassAndObject(output, obj)
            }

            override fun read(kryo: Kryo, input: Input?, type: Class<Model>?): Model {
                print("read")
                val m = kryo.readObject(input, Model::class.java)
                return m
            }

        }


ExecutionContext.see.config.registerTypeWithKryoSerializer(ModelCom::class.java, serializer::class.java)

错误

Exception in thread "main" org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot serialize operator object class org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory.
    at org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:222)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexConfig(StreamingJobGraphGenerator.java:460)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:272)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:243)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:243)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:207)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:159)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:94)
    at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:737)
    at org.apache.flink.optimizer.plan.StreamingPlan.getJobGraph(StreamingPlan.java:40)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:86)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489)
    at core.EgressEngine.start(EgressEngine.kt:187)
    at core.EgressEngineKt.main(EgressEngine.kt:45)
Caused by: java.io.NotSerializableException: org.apache.jena.rdf.model.impl.ModelCom
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
    at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:515)
    at org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:219)
    ... 14 more
kotlin apache-flink jena
1个回答
0
投票

Jena模型不可序列化,因此这种方法行不通。相反,您可以做的是发送足够多的序列化数据,以便每个需要模型的实例都可以实例化一个。

请参阅jena-users列表中的this thread,了解如何为Spark解决此问题;对于任何分配计算的基于JVM的框架,基本问题都是相同的。

© www.soinside.com 2019 - 2024. All rights reserved.