在如下表所示的数据框中,我需要从另一行中减去两行的总和,所有行均按 CORP 和 BRANCH 类型分组(其中第一行已包含其他两行)。
公司 | 分行 | 市场 | 金额 |
---|---|---|---|
C0 | B1 | 欧洲 | 569 |
C0 | B1 | 美国 | 4595 |
C0 | B1 | 南美 | 221 |
C0 | B1 | 没有美国 | 545 |
C0 | B1 | 亚洲 | 989 |
C0 | B2 | 欧洲 | 65 |
C0 | B2 | 美国 | 6547 |
C0 | B2 | 南美 | 669 |
C0 | B2 | 没有美国 | 2258 |
C0 | B2 | 亚洲 | 54 |
C1 | B3 | 欧洲 | 12222 |
C1 | B3 | 美国 | 8453 |
C1 | B3 | 南美 | 22 |
C1 | B3 | 没有美国 | 545 |
C1 | B3 | 亚洲 | 99 |
例如,行“C0 B1 AMERICA”将等于 4595 - (221 + 545) = 3829。
我想出了这两个解决方案。
explain
函数似乎表明它们具有相同的成本,但哪个更好?
df_res1 = df_res1.withColumn('america', F.when(F.col('MARKET') == 'AMERICA', F.col('AMOUNT')).otherwise(F.lit(0)))\
.withColumn('no_so_america', F.when(F.col('MARKET').isin(['NO_AMERICA', 'SO_AMERICA']), F.col('AMOUNT'))\
.otherwise(F.lit(0)))
win_perc = W.partitionBy('CORP', 'BRANCH')
df_res1 = df_res1.withColumn('tmp', F.col('america')-F.sum('no_so_america').over(win_perc))\
.withColumn('AMOUNT',\
F.when(F.col('tmp') > F.lit(0), F.col('tmp'))
.otherwise(F.col('AMOUNT')))\
.drop('america', 'no_so_america', 'tmp')
df_res1.explain(mode='cost')
No.2
df_sum_no_so = df_res2.where(F.col('MARKET').isin(['NO_AMERICA', 'SO_AMERICA']))\
.groupBy('CORP', 'BRANCH')\
.agg(F.sum('AMOUNT').alias('sum_no_so_america'))
df_res2 = df_res2.join(df_sum_no_so, ['CORP', 'BRANCH'], 'left')
df_res2 = df_res2.withColumn('AMOUNT',\
F.when(F.col('MARKET') == 'AMERICA', F.col('AMOUNT') - F.col('sum_no_so_america'))\
.otherwise(F.col('AMOUNT')))
df_res2.explain(mode='cost')
您可以使用
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
对一组进行运算,根据整组数据的计算输出一行或多行。
我提供了一个虚拟行添加作为示例,只是为了展示如何操作。
在您的情况下,您可以将这个新表与您自己的源表连接起来。
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark.sql.pandas.functions import PandasUDFType
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import pandas as pd
spark = SparkSession.builder.appName("example").getOrCreate()
data = [
("C0", "B1", "EUROPE", 569),
("C0", "B1", "AMERICA", 4595),
("C0", "B1", "SO-AMERICA", 221),
("C0", "B1", "NO-AMERICA", 545),
("C0", "B1", "ASIA", 989),
("C0", "B2", "EUROPE", 65),
("C0", "B2", "AMERICA", 6547),
("C0", "B2", "SO-AMERICA", 669),
("C0", "B2", "NO-AMERICA", 2258),
("C0", "B2", "ASIA", 54),
("C1", "B3", "EUROPE", 12222),
("C1", "B3", "AMERICA", 8453),
("C1", "B3", "SO-AMERICA", 22),
("C1", "B3", "NO-AMERICA", 545),
("C1", "B3", "ASIA", 99)
]
df = spark.createDataFrame(data, ["CORP", "BRANCH", "MARKET", "AMOUNT"])
schema = StructType([
StructField("CORP", StringType()),
StructField("BRANCH", StringType()),
StructField("MARKET", StringType()),
StructField("AMOUNT", IntegerType())
])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def adjust_america(pdf):
america = pdf[pdf['MARKET'] == 'AMERICA']['AMOUNT'].sum()
so_america = pdf[pdf['MARKET'] == 'SO-AMERICA']['AMOUNT'].sum()
no_america = pdf[pdf['MARKET'] == 'NO-AMERICA']['AMOUNT'].sum()
america_adjusted = america - (so_america + no_america)
return pd.DataFrame({
'CORP': [pdf['CORP'].iloc[0], "dummy_C"],
'BRANCH': [pdf['BRANCH'].iloc[0], "dummy_B"],
'MARKET': ["AMERICA_ADJUSTED", "dummy_market"],
'AMOUNT': [america_adjusted, -9232]
})
result_df = df.groupBy("CORP", "BRANCH").apply(adjust_america)
result_df.show()
输出:
+-------+-------+----------------+------+
| CORP| BRANCH| MARKET|AMOUNT|
+-------+-------+----------------+------+
| C0| B1|AMERICA_ADJUSTED| 3829|
|dummy_C|dummy_B| dummy_market| -9232|
| C0| B2|AMERICA_ADJUSTED| 3620|
|dummy_C|dummy_B| dummy_market| -9232|
| C1| B3|AMERICA_ADJUSTED| 7886|
|dummy_C|dummy_B| dummy_market| -9232|
+-------+-------+----------------+------+