我是 pyspark 的新手,所以希望有人能提供帮助。我正在尝试读取存储在 GCP 存储桶上的镶木地板文件。文件按日期分区,例如
bucket-name/year={}/month={}/day={}
对于给定的文件,我们有以下模式描述:
据我所知,pyspark 在评估 float 和 double 数据类型是兼容的数据类型时没有问题。 (我在网上找到的类似错误示例与数据类型不兼容有关,例如字符串和浮点数) 然而,我们面临着这个奇怪的问题,如果我们尝试读入该文件的所有可用数据:
#i.e. read all the data we have ever received for this file
path = 'bucket-name/year=*/month=*/day=*'
df = spark.read.format('parquet').load(path)
df.cache().count()
我们收到以下错误。 (请注意,如果我们这样做
df.count()
,我们不会收到此错误,只有在我们先缓存时才会遇到)
此外,spark.read 生成的模式提到列 x 的数据类型为 float。因此,从模式角度来看,spark 很乐意读取数据并表示数据类型是 float。然而,如果我们缓存,事情就会变得糟糕。
希望情况细节足够清楚:)
An error occurred while calling o923.count. :
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 15 in stage 41.0 failed 4 times, most recent failure: Lost task
15.3 in stage 41.0 (TID 13228, avroconversion-validation-w-1.c.vf-gned-nwp-live.internal, executor
47): java.lang.UnsupportedOperationException:
org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary
at
org.apache.parquet.column.Dictionary.decodeToFloat(Dictionary.java:53)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToFloat(ParquetDictionary.java:41)
at
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getFloat(OnHeapColumnVector.java:423)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source) at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
at
org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anon$1.hasNext(InMemoryRelation.scala:125)
at
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
at
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1165)
at
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:359)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1165)
at
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123) at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
根据文档:
cache()方法是使用默认存储级别的简写, 这是 StorageLevel.MEMORY_ONLY (将反序列化的对象存储在 记忆)
cache()
是一个惰性操作,如果您查看 MEMORY_ONLY
部分,您会注意到 cache()
尝试将 RDD/DataFrame 作为反序列化的 Java 对象存储在 JVM 中[一旦您对缓存的 RDD/DataFrame 调用操作],因此您可以RDD/DataFrame 中对象的反序列化存在问题。
我建议尝试执行一些转换,例如 map()
来检查序列化/反序列化是否正常工作
如果您在
df.count()
中调用 df
而没有进行任何转换,则 Spark 不会反序列化您的对象