Spark SQL加入多个失败

问题描述 投票:1回答:2

我有大约18个DataSet,每个DataSet包含10个不同的列和1k~10k行,我必须逐个对它们进行左连接。

当我做:

b = a.join(A, 'one column', 'outer_left').dictinct() 
c = b.join(B, 'one column', 'outer_left').dictinct()
d = c.join(C, 'one column', 'outer_left').dictinct()
... 
n = m.join(M, 'one column', 'outer_left').dictinct()
n.write()

有用。

但后来我想像这样优化代码:

val data = List(a, A, B, C, ..., N, M)
val joined = data.reduce((left, right) => left.join(right, 'one column', 'outer_left').distinct()) 
val result = joined.distinct()  // this one works
result.write()  // this one doesn't work




18/02/28 09:54:35 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_45_90 !
18/02/28 09:54:35 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_108_191 !
[Stage 63:>                                                       (1 + 4) / 200]18/02/28 09:54:38 WARN TaskSetManager: Lost task 4.1 in stage 63.3 (TID 12686, 127.0.0.1, executor 8): FetchFailed(null, shuffleId=11, mapId=-1, reduceId=4, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 11
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
    at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:693)
    at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:147)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
    at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:165)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
    at at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)   at org.apache.spark.scheduler.Task.run(Task.scala:108)  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)  at java.lang.Thread.run(Thread.java:748) 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1329)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1729)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:186)
... 44 more
scala apache-spark left-join
2个回答
© www.soinside.com 2019 - 2024. All rights reserved.