使用数据集交叉加入Apache Spark非常慢

问题描述 投票:0回答:1

我已经在spark用户论坛上发布了这个问题,但没有收到任何回复,所以再次在这里询问。

我们有一个用例,我们需要进行笛卡尔连接,出于某种原因,我们无法使用数据集API。

我们有两个数据集:

  • 一个带有2个字符串列的数据集表示c1,c2。它是一个包含约100万条记录的小型数据集。这两列都是32个字符的字符串,所以应该小于500 MB。 我们广播这个数据集
  • 其他数据集大得多,约有1000万条记录
val ds1 = spark.read.format("csv").option("header", "true").load(<s3-location>).select("c1", "c2")
ds1.count
val ds2 = spark.read.format("csv").load(<s3-location>).toDF("c11", "c12", "c13", "c14", "c15", "ts")
ds2.count
ds2.crossJoin(broadcast(ds1)).filter($"c1" <= $"c11" && $"c11" <= $"c2").count

如果我使用RDD api实现它,我在ds1中广播数据,然后在ds2中过滤数据,它工作正常。

我已经确认广播成功了。

2019-02-14 23:11:55 INFO CodeGenerator:54 - 代码生成于10.469136 ms 2019-02-14 23:11:55 INFO TorrentBroadcast:54 - 开始阅读广播变量29 2019-02-14 23:11:55 INFO TorrentBroadcast:54 - 读取广播变量29花了6毫秒2019-02-14 23:11:56 INFO CodeGenerator:54 - 代码生成于11.280087 ms

查询计划:

==物理计划== BroadcastNestedLoopJoin BuildRight,Cross,((c1#68 <= c11#13)&&(c11#13 <= c2#69)) : - * Project [] :+ - * Filter isnotnull(_c0#0) :+ - * FileScan csv [_c0#0,_c1#1,_c2#2,_c3#3,_c4#4,_c5#5]批处理:false,格式:CSV,位置:InMemoryFileIndex [],PartitionFilters:[], PushedFilters:[IsNotNull(_c0)],ReadSchema:struct <_c0:string,_c1:string,_c2:string,_c3:string,_c4:string,_c5:string> + - BroadcastExchange IdentityBroadcastMode + - *项目[c1#68,c2#69] + - *过滤器(isnotnull(c1#68)&& isnotnull(c2#69)) + - * FileScan csv [c1#68,c2#69]批处理:false,格式:CSV,位置:InMemoryFileIndex [],PartitionFilters:[],PushedFilters:[IsNotNull(c1),IsNotNull(c2)],ReadSchema:struct

然后舞台不进步。

我更新了代码以使用广播ds1,然后在ds2的mapPartitions中进行了连接。

val ranges = spark.read.format("csv").option("header", "true").load(<s3-location>).select("c1", "c2").collect
val rangesBC = sc.broadcast(ranges)

然后在mapPartitions方法中使用此rangeBC来识别ds2中每行所属的范围,此作业在3小时内完成,而另一个作业即使在24小时后也未完成。这种情况意味着查询优化器没有做我想做的事情。

我究竟做错了什么?任何指针都会有所帮助。谢谢!

apache-spark join apache-spark-dataset cross-join
1个回答
1
投票

我不知道您是使用裸机还是采用现货或按需或专用的AWS,或者使用AZURE等的虚拟机。我的看法:

  • 欣赏10M x 1M是很多工作,即使.filter适用于结果交叉连接。这需要一些时间。你有什么期望?
  • Spark通常都是以线性方式进行缩放。
  • 具有VM的数据中心没有专用,因此没有最快的性能。

然后:

  • 我使用.86核心和6GB驱动社区版的模拟设置运行Databricks 10M x 100K。那是17分钟。
  • 我在你的例子中在4节点AWS EMR非专用集群上运行了10M x 1M(有一些EMR奇怪,比如在一个有价值的实例上保留驱动程序!)部分完成需要3个小时。见下图。

enter image description here

那么,回答你的问题: - 你没有做错任何事。

  • 只是需要更多资源才能实现更多并行化。
  • 我确实添加了一些显式分区,你可以看到。
© www.soinside.com 2019 - 2024. All rights reserved.