Spark:相当于数据帧中的zipwithindex

问题描述 投票:5回答:1

假设我有以下数据帧:

dummy_data = [('a',1),('b',25),('c',3),('d',8),('e',1)]
df = sc.parallelize(dummy_data).toDF(['letter','number'])

我想创建以下数据帧:

[('a',0),('b',2),('c',1),('d',3),('e',0)]

我所做的是将其转换为rdd并使用zipWithIndex函数并在加入结果后:

convertDF = (df.select('number')
              .distinct()
              .rdd
              .zipWithIndex()
              .map(lambda x:(x[0].number,x[1]))
              .toDF(['old','new']))


finalDF = (df
            .join(convertDF,df.number == convertDF.old)
            .select(df.letter,convertDF.new))

如果在数据帧中有与zipWIthIndex类似的功能吗?还有另一种更有效的方法来完成这项任务吗?

python apache-spark pyspark spark-dataframe
1个回答
0
投票

请检查https://issues.apache.org/jira/browse/SPARK-23074在数据帧中的这种直接功能奇偶校验。如果你有兴趣在Spark中看到这个,那就请注意jira。

这是PySpark中的一个解决方法:

def dfZipWithIndex (df, offset=1, colName="rowId"):
    '''
        Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe 
        and preserves a schema

        :param df: source dataframe
        :param offset: adjustment to zipWithIndex()'s index
        :param colName: name of the index column
    '''

    new_schema = StructType(
                    [StructField(colName,LongType(),True)]        # new added field in front
                    + df.schema.fields                            # previous schema
                )

    zipped_rdd = df.rdd.zipWithIndex()

    new_rdd = zipped_rdd.map(lambda (row,rowId): ([rowId +offset] + list(row)))

    return spark.createDataFrame(new_rdd, new_schema)

这也可以在abalon包中找到。

© www.soinside.com 2019 - 2024. All rights reserved.