当我调用rdd.join(rdd)时发生了什么

问题描述 投票:1回答:1

我正在开发一个应用程序,我需要在RDD中使用相同的键对每对行执行计算,这里是RDD结构:

List<Tuple2<String, Tuple2<Integer, Integer>>> dat2 = new ArrayList<>();
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Alice", new Tuple2<Integer, Integer>(1, 1)));
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Alice", new Tuple2<Integer, Integer>(2, 5)));
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Alice", new Tuple2<Integer, Integer>(3, 78)));
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Bob", new Tuple2<Integer, Integer>(1, 6)));
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Bob", new Tuple2<Integer, Integer>(2, 11)));
JavaRDD<Tuple2<String, Tuple2<Integer, Integer>>> y2 = sc.parallelize(dat2);

现在,每个人的数据都可以被视为:(时间戳,价值)。我希望知道每一行在+ -1时间戳中发生的值的数量。 (我知道这看起来像滑动窗口,但我想要事件级粒度)

y2.join(y2);
resultOfJoin.filter(t -> t._2()._1()._1() - t._2()._2()._1() <= 1 && t._2()._1()._1() - t._2()._2()._1() >= -1)

在这种情况下,我遇到的最佳解决方案是将RDD与自身连接,为每个人创建k^2行,其中k是与此人相关联的行数。

现在,我知道这是一场彻底的灾难。我明白这会导致洗牌(并且洗牌很糟糕)但是我不能带来更好的东西。

我有3个问题:

  1. 由于我在连接后立即过滤,它会影响连接引起的压力(换句话说,会有任何优化)吗?
  2. 网络上传递的行数是多少? (我知道在最坏的情况下,结果RDD将有n ^ 2行)将在网络上发送的行是#workersn(仅发送一个副本并在worker上复制)或#workersn ^ 2(每行发送一行)结果工人的组合)?
  3. 如果我想与Dataset合作,我可以加入过滤器。我理解数据集对计算图有额外的优化。如果我转移到数据集,我应该期待多少改进?
java apache-spark join apache-spark-sql spark-dataframe
1个回答
1
投票

由于我在连接后立即过滤,它会影响连接引起的压力(换句话说,会有任何优化)吗?

不,没有优化。

网络上传递的行数是多少?

O(N)(特别是每个记录将被洗牌两次,每个父记录一次)您按键加入,因此每个项目转到一个,只有一个分区。

如果我愿意使用数据集,我可以加入过滤器。我理解数据集对计算图有额外的优化。如果我转移到数据集,我应该期待多少改进?

随机过程更好地进行了优化,但是否则您不能指望任何特定情况的优化。

希望知道每一行在+ -1时间戳中发生的值的数量。

尝试窗口功能:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val w = Window.partitionBy("id").ordetBy("timestamp")

rdd.toDF("id", "data")
  .select($"id", $"data._1" as "timestamp", $"data._2" as "value"))
  .withColumn("lead", lead($"value", 1).over(w))
  .withColumn("lag", lag($"value", 1).over(w))
© www.soinside.com 2019 - 2024. All rights reserved.