所以我有一个案例类客户数据和一个案例类帐户数据,如下:
case class CustomerData(
customerId: String,
forename: String,
surname: String
)
case class AccountData(
customerId: String,
accountId: String,
balance: Long
)
我需要加入这两个以使它们形成以下案例类:
case class CustomerAccountOutput(
customerId: String,
forename: String,
surname: String,
//Accounts for this customer
accounts: Seq[AccountData],
//Statistics of the accounts
numberAccounts: Int,
totalBalance: Long,
averageBalance: Double
)
我需要表明,如果accountsId或余额中出现null,则帐户数量为0,总余额为null,平均余额也为null。将 null 替换为 0 也是可以接受的。
最终结果应该是这样的:
+----------+-----------+--------+---------------------------------------------------------------------+--------------+------------+-----------------+
|customerId|forename |surname |accounts |numberAccounts|totalBalance|averageBalance |
+----------+-----------+--------+---------------------------------------------------------------------+--------------+------------+-----------------+
|IND0113 |Leonard |Ball |[[IND0113,ACC0577,531]] |1 |531 |531.0 |
|IND0277 |Victoria |Hodges |[[IND0277,null,null]] |0 |null |null |
|IND0055 |Ella |Taylor |[[IND0055,ACC0156,137], [IND0055,ACC0117,148]] |2 |285 |142.5 |
|IND0129 |Christopher|Young |[[IND0129,null,null]] |0 |null
我已经获得了两个要加入的案例类,这是代码:
val customerDS = customerDF.as[CustomerData]
val accountDS = accountDF.withColumn("balance",'balance.cast("long")).as[AccountData]
//END GIVEN CODE
val customerAccountsDS = customerDS.joinWith(accountDS,customerDS("customerID") === accountDS("customerID"),"leftouter")
我如何才能得到上述结果?我不允许使用“spark.sql.function._”库。
您应该能够通过使用 Spark 中的
concat_ws
和 collect_list
函数来完成此操作。
//Creating sample data
case class CustomerData(
customerId: String,
forename: String,
surname: String
)
case class AccountData(
customerId: String,
accountId: String,
balance: Long
)
import org.apache.spark.sql.functions._
val customercolumns = Seq("customerId","forename","surname")
val acccolumns = Seq("customerId","accountId","balance")
val custdata = Seq(("IND0113", "Leonard","Ball"), ("IND0277", "Victoria","Hodges"), ("IND0055", "Ella","Taylor"),("IND0129","Christopher","Young")).toDF(customercolumns:_*).as[CustomerData]
val acctdata = Seq(("IND0113","ACC0577",531),("IND0055","ACC0156",137),("IND0055","ACC0117",148)).toDF(acccolumns:_*).as[AccountData]
val customerAccountsDS = custdata.join(acctdata,custdata("customerID") === acctdata("customerID"),"leftouter").drop(acctdata.col("customerId"))
// get the total balance, number of accounts, avg balance by aggregating based on group by columns.
val totalBalanaceDF = customerAccountsDS.groupBy("customerId","forename","surname").agg(sum("balance").alias("totalBalance"), avg("balance").alias("averageBalance"), count("accountId").alias("numberAccounts"))
val result = customerAccountsDS.withColumn("accounts", concat_ws(",", $"customerId", $"accountId",$"balance"))
// Creating a accounts array column by aggregating based on group by columns.
val finalresult = result.groupBy("customerId","forename","surname").agg(collect_list($"accounts").alias("accounts"))
// Join the finalresult and totalBalanaceDF to get the ectual output needed based on the unique columns.
val finaldf = finalresult.join(totalBalanaceDF, Seq("customerId","forename","surname"), "inner")
display(finaldf)