从avro架构生成的类的Spark问题

问题描述 投票:1回答:1

我有一段用spark编写的代码,它将HDFS中的数据加载到avro idl生成的java类中。在以这种方式创建的RDD上,我正在执行简单的操作,结果取决于我是否在它之前缓存RDD,即如果我在下面运行代码

val loadedData = loadFromHDFS[Data](path,...)
println(loadedData.map(x => x.getUserId + x.getDate).distinct().count()) // 200000

程序将打印200000,另一方面执行下一个代码

val loadedData = loadFromHDFS[Data](path,...).cache()
println(loadedData.map(x => x.getUserId + x.getDate).distinct().count()) // 1

导致1打印到标准输出。

看起来,当我在读取缓存数据后检查字段的值时

我很确定所述问题的根本原因是avro idl生成的类的序列化问题,但我不知道如何解决它。我尝试使用Kryo,注册生成的类(Data),从chill_avro注册给定类(SpecificRecordSerializer,SpecificRecordBinarySerializer等)的不同序列化程序,但这些想法都没有帮助我。

我怎么能解决这个问题?

Link以最小,完整和可验证的例子。

serialization apache-spark avro
1个回答
0
投票

尝试下面的代码 -

val loadedData = loadFromHDFS[Data](path,...)
println(loadedData.map(x => x.getUserId + x.getDate).distinct().count()).cache()
© www.soinside.com 2019 - 2024. All rights reserved.