基于scala中条件的列的spark数据帧聚合

问题描述 投票:0回答:1

我有以下格式的csv数据。

我需要找到2017年营业额大于100的排名前2位的供应商。

营业额=总和(状态为全额付款的发票)-总和(发票其状态为“异常”或“已拒绝”)

我已按照如下方式从datebricks scala笔记本中的csv加载数据:

val invoices_data = spark.read.format(file_type)
                  .option("header", "true")
                  .option("dateFormat", "M/d/yy")
                  .option("inferSchema", "true")
                 .load("invoice.csv")

然后我尝试按供应商名称进行分组

val avg_invoice_by_vendor = invoices_data.groupBy("VendorName")

但是现在我不知道该如何继续。

这里是示例csv数据。

Id     InvoiceDate      Status         Invoice   VendorName
    2   2/23/17         Exception       23        V1
    3   11/23/17        Paid-in-Full    56        V1
    1   12/20/17        Paid-in-Full    12        V1
    5   8/4/19          Paid-in-Full    123       V2
    6   2/6/17          Paid-in-Full    237       V2
    9   3/9/17          Rejected        234       V2
    7   4/23/17         Paid-in-Full    78        V3
    8   5/23/17         Exception       345       V4
scala dataframe apache-spark-sql azure-databricks
1个回答
0
投票

您可以将udf用于签名发票,具体取决于状态以及使用sum函数对汇总df进行分组之后:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DateType
def signInvoice: (String, Int) => Int = (status: String, invoice: Int) => {
  status match {
    case "Exception" => -invoice
    case "Paid-in-Full" => invoice
    case _ => throw new IllegalStateException("wrong status")
  }
}

val signInvoiceUdf = spark.udf.register("signInvoice", signInvoice)
invoices_data
  .withColumn("Invoice", col("Invoice").as[Int])
  .withColumn("signed_invoice", signInvoiceUdf('Status, 'Invoice))
  .withColumn("InvoiceDate", col("InvoiceDate").cast(DateType))
  .filter(year(col("InvoiceDate")) === lit(2017))
  .groupBy("VendorName")
  .agg(sum("Invoice").as("sum_invoice"))
  .filter(col("sum_invoice") > 100)
© www.soinside.com 2019 - 2024. All rights reserved.