Pyspark 减去数据框中分组行的最佳方法

问题描述 投票:0回答:1

在如下表所示的数据框中,我需要从另一行中减去两行的总和,所有行均按 CORP 和 BRANCH 类型分组(其中第一行已包含其他两行)。

公司 分行 市场 金额
C0 B1 欧洲 569
C0 B1 美国 4595
C0 B1 南美 221
C0 B1 没有美国 545
C0 B1 亚洲 989
C0 B2 欧洲 65
C0 B2 美国 6547
C0 B2 南美 669
C0 B2 没有美国 2258
C0 B2 亚洲 54
C1 B3 欧洲 12222
C1 B3 美国 8453
C1 B3 南美 22
C1 B3 没有美国 545
C1 B3 亚洲 99

例如,行“C0 B1 AMERICA”将等于 4595 - (221 + 545) = 3829。

我想出了这两个解决方案。

explain
函数似乎表明它们具有相同的成本,但哪个更好?

df_res1 = df_res1.withColumn('america', F.when(F.col('MARKET') == 'AMERICA', F.col('AMOUNT')).otherwise(F.lit(0)))\
                 .withColumn('no_so_america', F.when(F.col('MARKET').isin(['NO_AMERICA', 'SO_AMERICA']), F.col('AMOUNT'))\
                                               .otherwise(F.lit(0)))

win_perc = W.partitionBy('CORP', 'BRANCH')

df_res1 = df_res1.withColumn('tmp', F.col('america')-F.sum('no_so_america').over(win_perc))\
                 .withColumn('AMOUNT',\
                            F.when(F.col('tmp') > F.lit(0), F.col('tmp'))
                             .otherwise(F.col('AMOUNT')))\
                 .drop('america', 'no_so_america', 'tmp') 

df_res1.explain(mode='cost')

No.2

df_sum_no_so = df_res2.where(F.col('MARKET').isin(['NO_AMERICA', 'SO_AMERICA']))\
                      .groupBy('CORP', 'BRANCH')\
                      .agg(F.sum('AMOUNT').alias('sum_no_so_america'))

df_res2 = df_res2.join(df_sum_no_so, ['CORP', 'BRANCH'], 'left')

df_res2 = df_res2.withColumn('AMOUNT',\
                            F.when(F.col('MARKET') == 'AMERICA', F.col('AMOUNT') - F.col('sum_no_so_america'))\
                            .otherwise(F.col('AMOUNT')))

df_res2.explain(mode='cost')
python pyspark
1个回答
0
投票

您可以使用

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
对一组进行运算,根据整组数据的计算输出一行或多行。

我提供了一个虚拟行添加作为示例,只是为了展示如何操作。

在您的情况下,您可以将这个新表与您自己的源表连接起来。

from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark.sql.pandas.functions import PandasUDFType
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import pandas as pd

spark = SparkSession.builder.appName("example").getOrCreate()
data = [
    ("C0", "B1", "EUROPE", 569),
    ("C0", "B1", "AMERICA", 4595),
    ("C0", "B1", "SO-AMERICA", 221),
    ("C0", "B1", "NO-AMERICA", 545),
    ("C0", "B1", "ASIA", 989),
    ("C0", "B2", "EUROPE", 65),
    ("C0", "B2", "AMERICA", 6547),
    ("C0", "B2", "SO-AMERICA", 669),
    ("C0", "B2", "NO-AMERICA", 2258),
    ("C0", "B2", "ASIA", 54),
    ("C1", "B3", "EUROPE", 12222),
    ("C1", "B3", "AMERICA", 8453),
    ("C1", "B3", "SO-AMERICA", 22),
    ("C1", "B3", "NO-AMERICA", 545),
    ("C1", "B3", "ASIA", 99)
]

df = spark.createDataFrame(data, ["CORP", "BRANCH", "MARKET", "AMOUNT"])

schema = StructType([
    StructField("CORP", StringType()),
    StructField("BRANCH", StringType()),
    StructField("MARKET", StringType()),
    StructField("AMOUNT", IntegerType())
])


@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def adjust_america(pdf):
    america = pdf[pdf['MARKET'] == 'AMERICA']['AMOUNT'].sum()
    so_america = pdf[pdf['MARKET'] == 'SO-AMERICA']['AMOUNT'].sum()
    no_america = pdf[pdf['MARKET'] == 'NO-AMERICA']['AMOUNT'].sum()
    america_adjusted = america - (so_america + no_america)
    return pd.DataFrame({
        'CORP': [pdf['CORP'].iloc[0], "dummy_C"],
        'BRANCH': [pdf['BRANCH'].iloc[0], "dummy_B"],
        'MARKET': ["AMERICA_ADJUSTED", "dummy_market"],
        'AMOUNT': [america_adjusted, -9232]

    })


result_df = df.groupBy("CORP", "BRANCH").apply(adjust_america)
result_df.show()

输出:

+-------+-------+----------------+------+
|   CORP| BRANCH|          MARKET|AMOUNT|
+-------+-------+----------------+------+
|     C0|     B1|AMERICA_ADJUSTED|  3829|
|dummy_C|dummy_B|    dummy_market| -9232|
|     C0|     B2|AMERICA_ADJUSTED|  3620|
|dummy_C|dummy_B|    dummy_market| -9232|
|     C1|     B3|AMERICA_ADJUSTED|  7886|
|dummy_C|dummy_B|    dummy_market| -9232|
+-------+-------+----------------+------+
© www.soinside.com 2019 - 2024. All rights reserved.