Apache Spark 的主键

问题描述 投票:0回答:4

我与 Apache Spark 和 PostgreSQL 建立了 JDBC 连接,并且想要将一些数据插入到我的数据库中。当我使用

append
模式时,我需要为每个
id
指定
DataFrame.Row
。 Spark有没有办法创建主键?

database postgresql hadoop apache-spark
4个回答
50
投票

斯卡拉

如果您需要的只是唯一的数字,您可以使用

zipWithUniqueId
并重新创建 DataFrame。首先是一些导入和虚拟数据:

import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}

val df = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")

提取架构以供进一步使用:

val schema = df.schema

添加 ID 字段:

val rows = df.rdd.zipWithUniqueId.map{
   case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}

创建数据框:

val dfWithPK = sqlContext.createDataFrame(
  rows, StructType(StructField("id", LongType, false) +: schema.fields))

Python 中也有同样的事情:

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType

row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)

df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()

def make_row(columns):
    def _make_row(row, uid):
        row_dict = row.asDict()
        return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
    return _make_row

f = make_row(df.columns)

df_with_pk = (df.rdd
    .zipWithUniqueId()
    .map(lambda x: f(*x))
    .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))

如果您喜欢连续数字,可以将

zipWithUniqueId
替换为
zipWithIndex
,但价格稍贵一些。

直接使用

DataFrame
API:

(通用 Scala、Python、Java、R,语法几乎相同)

以前我错过了

monotonicallyIncreasingId
功能,只要您不需要连续的数字,它就应该可以正常工作:

import org.apache.spark.sql.functions.monotonicallyIncreasingId

df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar|         id|
// +---+----+-----------+
// |  a|-1.0|17179869184|
// |  b|-2.0|42949672960|
// |  c|-3.0|60129542144|
// +---+----+-----------+

虽然有用,但

monotonicallyIncreasingId
是不确定的。不仅执行之间的 id 可能不同,而且当后续操作包含过滤器时,如果没有额外的技巧,就无法使用它来识别行。

也可以使用

rowNumber
窗函数:

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()

不幸的是:

警告窗口:没有为窗口操作定义分区!将所有数据移动到单个分区,这可能会导致严重的性能下降。

因此,除非您有一种自然的方法来分区数据并确保唯一性,否则目前并不是特别有用。


14
投票
from pyspark.sql.functions import monotonically_increasing_id

df.withColumn("id", monotonically_increasing_id()).show()

请注意 df.withColumn 的第二个参数是 monotonically_increasing_id() 而不是 monotonically_increasing_id 。


3
投票

我发现以下解决方案对于 zipWithIndex() 是所需行为的情况相对简单,即对于那些需要连续整数的情况。

在本例中,我们使用 pyspark 并依靠字典理解将原始行对象映射到适合新架构(包括唯一索引)的新字典。

# read the initial dataframe without index
dfNoIndex = sqlContext.read.parquet(dataframePath)
# Need to zip together with a unique integer

# First create a new schema with uuid field appended
newSchema = StructType([StructField("uuid", IntegerType(), False)]
                       + dfNoIndex.schema.fields)
# zip with the index, map it to a dictionary which includes new field
df = dfNoIndex.rdd.zipWithIndex()\
                      .map(lambda (row, id): {k:v
                                              for k, v
                                              in row.asDict().items() + [("uuid", id)]})\
                      .toDF(newSchema)

0
投票

对于任何不需要整数类型的人来说,连接多个列的值(其组合在数据中是唯一的)可能是一个简单的替代方案。您必须处理空值,因为 concat/concat_ws 不会为您执行此操作。如果连接的值很长,您还可以对输出进行哈希处理:

import pyspark.sql.functions as sf

unique_id_sub_cols = ["a", "b", "c"]
df = df.withColumn(
        "UniqueId",
        sf.md5(
            sf.concat_ws(
                "-",
                *[
                    sf.when(sf.col(sub_col).isNull(), sf.lit("Missing")).otherwise(
                        sf.col(sub_col)
                    )
                    for sub_col in unique_id_sub_cols
                ]
            )
        ),
    )
© www.soinside.com 2019 - 2024. All rights reserved.