使用 Spark/scala 将客户和帐户数据映射到案例类

问题描述 投票:0回答:1

所以我有一个案例类客户数据和一个案例类帐户数据,如下:

case class CustomerData(
                      customerId: String,
                      forename: String,
                      surname: String
                    )
 case class AccountData(
                      customerId: String,
                      accountId: String,
                      balance: Long
                    )

我需要加入这两个以使它们形成以下案例类:

case class CustomerAccountOutput(
                                customerId: String,
                                forename: String,
                                surname: String,
                                //Accounts for this customer
                                accounts: Seq[AccountData],
                                //Statistics of the accounts
                                numberAccounts: Int,
                                totalBalance: Long,
                                averageBalance: Double
                              )

我需要表明,如果accountsId或余额中出现null,则帐户数量为0,总余额为null,平均余额也为null。将 null 替换为 0 也是可以接受的。

最终结果应该是这样的:

+----------+-----------+--------+---------------------------------------------------------------------+--------------+------------+-----------------+
|customerId|forename   |surname |accounts                                                             |numberAccounts|totalBalance|averageBalance   |
+----------+-----------+--------+---------------------------------------------------------------------+--------------+------------+-----------------+
|IND0113   |Leonard    |Ball    |[[IND0113,ACC0577,531]]                                              |1             |531         |531.0            |
|IND0277   |Victoria   |Hodges  |[[IND0277,null,null]]                                                |0             |null        |null             |
|IND0055   |Ella       |Taylor  |[[IND0055,ACC0156,137], [IND0055,ACC0117,148]]                       |2             |285         |142.5            |
|IND0129   |Christopher|Young   |[[IND0129,null,null]]                                                |0             |null   

我已经获得了两个要加入的案例类,这是代码:

val customerDS = customerDF.as[CustomerData]
  val accountDS = accountDF.withColumn("balance",'balance.cast("long")).as[AccountData]
  //END GIVEN CODE

val customerAccountsDS = customerDS.joinWith(accountDS,customerDS("customerID") === accountDS("customerID"),"leftouter")

我如何才能得到上述结果?我不允许使用“spark.sql.function._”库。

scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0
1个回答
1
投票

您应该能够通过使用 Spark 中的

concat_ws
collect_list
函数来完成此操作。

//Creating sample data
case class CustomerData(
                      customerId: String,
                      forename: String,
                      surname: String
                    )
 case class AccountData(
                      customerId: String,
                      accountId: String,
                      balance: Long
                    )
import org.apache.spark.sql.functions._
val customercolumns = Seq("customerId","forename","surname")
val acccolumns = Seq("customerId","accountId","balance")
val custdata = Seq(("IND0113", "Leonard","Ball"), ("IND0277", "Victoria","Hodges"), ("IND0055", "Ella","Taylor"),("IND0129","Christopher","Young")).toDF(customercolumns:_*).as[CustomerData]
val acctdata = Seq(("IND0113","ACC0577",531),("IND0055","ACC0156",137),("IND0055","ACC0117",148)).toDF(acccolumns:_*).as[AccountData]
val customerAccountsDS = custdata.join(acctdata,custdata("customerID") === acctdata("customerID"),"leftouter").drop(acctdata.col("customerId"))
// get the total balance, number of accounts, avg balance by aggregating based on group by columns.
val totalBalanaceDF = customerAccountsDS.groupBy("customerId","forename","surname").agg(sum("balance").alias("totalBalance"), avg("balance").alias("averageBalance"), count("accountId").alias("numberAccounts"))

val result = customerAccountsDS.withColumn("accounts", concat_ws(",", $"customerId", $"accountId",$"balance"))
// Creating a accounts array column by aggregating based on group by columns.
val finalresult = result.groupBy("customerId","forename","surname").agg(collect_list($"accounts").alias("accounts"))
//  Join the finalresult and totalBalanaceDF to get the ectual output needed based on the unique columns.
val finaldf = finalresult.join(totalBalanaceDF, Seq("customerId","forename","surname"), "inner")
display(finaldf)

您可以看到如下输出:

© www.soinside.com 2019 - 2024. All rights reserved.