我有以下格式的csv数据。
我需要找到2017年营业额大于100的排名前2位的供应商。
营业额=总和(状态为全额付款的发票)-总和(发票其状态为“异常”或“已拒绝”)
我已按照如下方式从datebricks scala笔记本中的csv加载数据:
val invoices_data = spark.read.format(file_type)
.option("header", "true")
.option("dateFormat", "M/d/yy")
.option("inferSchema", "true")
.load("invoice.csv")
然后我尝试按供应商名称进行分组
val avg_invoice_by_vendor = invoices_data.groupBy("VendorName")
但是现在我不知道该如何继续。
这里是示例csv数据。
Id InvoiceDate Status Invoice VendorName
2 2/23/17 Exception 23 V1
3 11/23/17 Paid-in-Full 56 V1
1 12/20/17 Paid-in-Full 12 V1
5 8/4/19 Paid-in-Full 123 V2
6 2/6/17 Paid-in-Full 237 V2
9 3/9/17 Rejected 234 V2
7 4/23/17 Paid-in-Full 78 V3
8 5/23/17 Exception 345 V4
您可以将udf用于签名发票,具体取决于状态以及使用sum函数对汇总df进行分组之后:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DateType
def signInvoice: (String, Int) => Int = (status: String, invoice: Int) => {
status match {
case "Exception" => -invoice
case "Paid-in-Full" => invoice
case _ => throw new IllegalStateException("wrong status")
}
}
val signInvoiceUdf = spark.udf.register("signInvoice", signInvoice)
invoices_data
.withColumn("Invoice", col("Invoice").as[Int])
.withColumn("signed_invoice", signInvoiceUdf('Status, 'Invoice))
.withColumn("InvoiceDate", col("InvoiceDate").cast(DateType))
.filter(year(col("InvoiceDate")) === lit(2017))
.groupBy("VendorName")
.agg(sum("Invoice").as("sum_invoice"))
.filter(col("sum_invoice") > 100)