添加列和作为PySpark数据帧新列

问题描述 投票:20回答:4

我使用PySpark和我有一堆数字列的火花数据帧。我想增加一列,是所有其他列的总和。

假设我有数据框中列“A”,“B”和“C”。我知道我能做到这一点:

df.withColumn('total_col', df.a + df.b + df.c)

问题是,我不想单独打出来的每一列,并将它们添加,特别是如果我有很多列。我希望能够自动或通过指定我想补充列名的列表,做到这一点。是否有另一种方式做到这一点?

python apache-spark pyspark spark-dataframe
4个回答
37
投票

这不明显。我看到火花Dataframes API中定义的列没有基于行的总和。

Version 2

这可以在一个相当简单的方式来完成:

newdf = df.withColumn('total', sum(df[col] for col in df.columns))

df.columns由pyspark为字符串给所有列名的星火据帧列表提供。对于不同的金额,你可以提供列名的任何其他列表,而不是。

我没有尝试这是我的第一个解决方案,因为我不能肯定它会如何表现。但是,它的工作原理。

Version 1

这是过于复杂,但工作也是如此。

你可以这样做:

  1. 使用df.columns获得列名称的列表
  2. 使用该名称列表,使列的列表
  3. 通过该列表的东西,将调用列重载的add函数在fold-type functional manner

与Python的reduce,如何操作符重载的作品有一定的了解,并为列pyspark代码here变成:

def column_add(a,b):
     return  a.__add__(b)

newdf = df.withColumn('total_col', 
         reduce(column_add, ( df[col] for col in df.columns ) ))

请注意,这是一个Python减少而不是火花RDD降低,并在第二个参数,以降低括号术语要求括号,因为它是一个列表生成表达。

经测试,工程!

$ pyspark
>>> df = sc.parallelize([{'a': 1, 'b':2, 'c':3}, {'a':8, 'b':5, 'c':6}, {'a':3, 'b':1, 'c':0}]).toDF().cache()
>>> df
DataFrame[a: bigint, b: bigint, c: bigint]
>>> df.columns
['a', 'b', 'c']
>>> def column_add(a,b):
...     return a.__add__(b)
...
>>> df.withColumn('total', reduce(column_add, ( df[col] for col in df.columns ) )).collect()
[Row(a=1, b=2, c=3, total=6), Row(a=8, b=5, c=6, total=19), Row(a=3, b=1, c=0, total=4)]

3
投票

解决方案

newdf = df.withColumn('total', sum(df[col] for col in df.columns))

发表@保罗的作品。不过,我得到的错误,许多其他的,我所看到的,

TypeError: 'Column' object is not callable

一段时间后,我发现(在我的情况下,至少)的问题。问题是,我以前进口的一些pyspark功能与线

from pyspark.sql.functions import udf, col, count, sum, when, avg, mean, min

所以线进口sum pyspark命令而df.withColumn('total', sum(df[col] for col in df.columns))应该使用正常蟒sum功能。

您可以删除与del sum的pyspark功能的参考。

否则,在我的情况,我改变了进口

import pyspark.sql.functions as F

然后引用的功能F.sum


2
投票

这样做的最直接的方式是使用expr功能

from pyspark.sql.functions import *
data = data.withColumn('total', expr("col1 + col2 + col3 + col4"))

0
投票

我的问题是相似的为我不得不添加连续列款项在PySpark数据帧新列以上(有点复杂)。这种方法使用的代码从上面保罗的版本1:

import pyspark
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.appName('addColAsCumulativeSUM').getOrCreate()
df=spark.createDataFrame(data=[(1,2,3),(4,5,6),(3,2,1)\
                              ,(6,1,-4),(0,2,-2),(6,4,1)\
                              ,(4,5,2),(5,-3,-5),(6,4,-1)]\
                              ,schema=['x1','x2','x3'])
df.show()

+---+---+---+
| x1| x2| x3|
+---+---+---+
|  1|  2|  3|
|  4|  5|  6|
|  3|  2|  1|
|  6|  1| -4|
|  0|  2| -2|
|  6|  4|  1|
|  4|  5|  2|
|  5| -3| -5|
|  6|  4| -1|
+---+---+---+

colnames=df.columns

补充一点,是累积的​​总和(连续)新的列:

for i in range(0,len(colnames)):
    colnameLst= colnames[0:i+1]
    colname = 'cm'+ str(i+1)
    df = df.withColumn(colname, sum(df[col] for col in colnameLst))

地方.show()

+---+---+---+---+---+---+
| x1| x2| x3|cm1|cm2|cm3|
+---+---+---+---+---+---+
|  1|  2|  3|  1|  3|  6|
|  4|  5|  6|  4|  9| 15|
|  3|  2|  1|  3|  5|  6|
|  6|  1| -4|  6|  7|  3|
|  0|  2| -2|  0|  2|  0|
|  6|  4|  1|  6| 10| 11|
|  4|  5|  2|  4|  9| 11|
|  5| -3| -5|  5|  2| -3|
|  6|  4| -1|  6| 10|  9|
+---+---+---+---+---+---+

是增加了“累加和”列如下所示:

cm1 = x1
cm2 = x1 + x2
cm3 = x1 + x2 + x3
© www.soinside.com 2019 - 2024. All rights reserved.