Spark Left Outer Join 会在不应该生成Optional.empty 时生成Optional.empty

问题描述 投票:0回答:1

我有一个包含节点对的 RDD,我需要为它们分配唯一的 id。

但是我遇到了 NPE,但我不知道如何解决它。

我基本上将所有节点放入一个不同的列表中,然后为它们分配 uniqueId。之后,我将原始对与这个新作业合并。

代码如下所示:

JavaPairRDD<Node, Node> pairs = // ... assigned previously
JavaPairRDD<Node, Long> index = pairs
        .flatMap(tuple -> Arrays.asList(tuple._1(), tuple._2()).iterator())
        .distinct()
        .zipWithUniqueId();

pairs.leftOuterJoin(index)
        .mapToPair(new MergeJoinResult())
        .mapToPair(Tuple2::swap)
        .leftOuterJoin(index)
        .mapToPair(new MergeJoinResult())
        .mapToPair(Tuple2::swap);

/*
 * Given a tuple like (node1, (node2, node1Index))
 * Creates a new tuple (node1, node2) where node1 is initialized with its index
 */
static class MergeJoinResult implements
        PairFunction<Tuple2<Node, Tuple2<Node, Optional<Long>>>, Node, Node>,
        Serializable {
    @Override
    public Tuple2<Node, Node> call(Tuple2<Node, Tuple2<Node, Optional<Long>>> row) throws Exception {
        return Tuple2.apply(new Node(row._1(), row._2()._2().get()), row._2()._1());
    }
}

我遇到的问题是

row._2()._2().get()
返回Optional.empty并且我得到了NPE。

但这应该是不可能的,因为我是从 RDD 对中导出索引 RDD 的。因此它们之间的 leftOuterJoin 应该总是产生匹配。

作为健全性检查,我添加了代码将整个 RDD 转储到 S3 以查看

pairs
index
的内容。数据就在那里,所有边以及正确的索引条目及其唯一 ID。我使用
toString()
进行了转储。

然后我认为问题出在

equals
实现上,我对代码执行了 delombok 并添加了打印语句来验证比较是否正确 对象之间返回
false
。在我的日志中,比较总是返回
true
,所以我不知道为什么我会在那里得到
Optional.empty

我注意到的另一个奇怪的事情是,当我对索引中的对象进行 stringfy 并执行分组依据时,我发现其中有重复项:

index
  .groupBy(t -> t._1().toString())
  .filter(t -> {
      int size = 0;
      for (Tuple2<Node, Long> value : t._2()) {
          size++;
          if (size >= 2) return true;
      }
      return false;
  });

如果我执行

pairs.cogroup(index)
,也会发生同样的情况。我得到多个具有相同 K 的条目。

我尝试在按字符串表示形式对这些对象进行分组后对它们进行比较,但它们的

equals
hashCode
返回相同的结果。我正在使用 Lombok 实现这些。

我还尝试将此代码之前的 RDD 序列化为 JSON 并加载我的机器中的所有内容,但执行此操作后,我的机器中没有出现 NPE。

我有点迷失在这里。

我的下一个猜测是问题出在序列化上(我正在使用 Kryo)。我要尝试的另一个选择是为 RDD 设置不同的分区器。

对于我在这里可以做什么有什么建议吗?我在 AWS Glue 中使用 Spark 3.3.1。

编辑:我更改了一些内容,将所有 Node 对象序列化为 JSON 字符串,并将其用作连接键,这很有效。这进一步暗示了序列化问题,我认为它与 equals 或 hashCode 无关,因为它们是由 Lombok 实现的。

java scala apache-spark left-join rdd
1个回答
0
投票

如果使用

LEFT_TABLE
RIGHT_TABLE
连接到
LEFT OUTER JOIN
并且
LEFT_TABLE
中存在一个实体,而
RIGHT_TABLE
中没有相应的匹配项,那么您将得到 NULL,是的。

从这个意义上说,JOIN 中的

OUTER
规范就是问题所在。很难从给出的内容中看出,但似乎您更喜欢这里的
INNER JOIN

请参阅 INNER JOIN、LEFT JOIN、RIGHT JOIN 和 FULL JOIN 之间有什么区别?了解有关区别的更多信息

© www.soinside.com 2019 - 2024. All rights reserved.