来自关系数据模型的Spark中的树/嵌套结构

问题描述 投票:2回答:2

如果我理解正确,我可以将spark数据集视为T类型的对象列表。如何以父项包含子项列表的方式连接两个数据集?但是孩子也会有自己孩子的名单......

一种方法是根据键做一个孩子的groupBy,但collect_list只返回一列,我想有更好的方法来做到这一点。

想要的结果基本上是Customer类型的数据集(客户对象列表?),但增加了:

  • 每个客户都有一张发票清单。
  • 每张发票都有自己的属性,但也有一个项目列表......
  • ......这可以继续(一棵树)

最终的结果就像是

case class Customer(customer_id: Int, name: String, address: String, age: Int, invoices: List[Invoices])
case class Invoice(invoice_id: Int, customer_id: Int, invoice_num:String, date: Int, invoice_type: String, items: List[Items])

对于这个结果,我需要来自以下输入:

case class Customer(customer_id: Int, name: String, address: String, age: Int)
case class Invoice(invoice_id: Int, customer_id: Int, invoice_num:String, date: Int, invoice_type: String)
case class InvoiceItem(item_id: Int, invoice_id: Int, num_of_items: Int, price: Double, total: Double)

    val customers_df = Seq(
       (11,"customer1", "address1", 10, "F")
      ,(12,"customer2", "address2", 20, "M")
      ,(13,"customer3", "address3", 30, "F")
    ).toDF("customer_id", "name", "address", "age", "sex")
    val customers_ds = customers_df.as[Customer].as("c")

    customers_ds.show

    val invoices_df = Seq(
       (21,11, "10101/1", 20181105, "manual")
      ,(22,11, "10101/2", 20181105, "manual")
      ,(23,11, "10101/3", 20181105, "manual")
      ,(24,12, "10101/4", 20181105, "generated")
      ,(25,12, "10101/5", 20181105, "pos")
    ).toDF("invoice_id", "customer_id", "invoice_num", "date", "invoice_type")
    val invoices_ds = invoices_df.as[Invoice].as("i")

    invoices_ds.show

    val invoice_items_df = Seq(
       (31, 21, 5, 10.0, 50.0)
      ,(32, 21, 3, 15.0, 45.0)
      ,(33, 22, 6, 11.0, 66.0)
      ,(34, 22, 7, 2.0, 14.0)
      ,(35, 23, 1, 100.0, 100.0)
      ,(36, 24, 4, 4.0, 16.0)
    ).toDF("item_id", "invoice_id", "num_of_items", "price", "total")
    val invoice_items_ds = invoice_items_df.as[InvoiceItem].as("ii")

    invoice_items_ds.show

在表格中它看起来像这样:

+-----------+---------+--------+---+---+
|customer_id|     name| address|age|sex|
+-----------+---------+--------+---+---+
|         11|customer1|address1| 10|  F|
|         12|customer2|address2| 20|  M|
|         13|customer3|address3| 30|  F|
+-----------+---------+--------+---+---+

+----------+-----------+-----------+--------+------------+
|invoice_id|customer_id|invoice_num|    date|invoice_type|
+----------+-----------+-----------+--------+------------+
|        21|         11|    10101/1|20181105|      manual|
|        22|         11|    10101/2|20181105|      manual|
|        23|         11|    10101/3|20181105|      manual|
|        24|         12|    10101/4|20181105|   generated|
|        25|         12|    10101/5|20181105|         pos|
+----------+-----------+-----------+--------+------------+

+-------+----------+------------+-----+-----+
|item_id|invoice_id|num_of_items|price|total|
+-------+----------+------------+-----+-----+
|     31|        21|           5| 10.0| 50.0|
|     32|        21|           3| 15.0| 45.0|
|     33|        22|           6| 11.0| 66.0|
|     34|        22|           7|  2.0| 14.0|
|     35|        23|           1|100.0|100.0|
|     36|        24|           4|  4.0| 16.0|
+-------+----------+------------+-----+-----+
apache-spark apache-spark-dataset
2个回答
1
投票

您似乎正在尝试将规范化数据读入Scala对象树。你当然可以用Spark做到这一点,但Spark可能不是最好的工具。如果数据足够小以适应内存,我认为这是你的问题,对象关系映射(ORM)库可能更适合这项工作。

如果您仍然想使用Spark,那么您就可以使用groupBycollect_list。你缺少的是struct()功能。

case class Customer(id: Int)
case class Invoice(id: Int, customer_id: Int)

val customers = spark.createDataset(Seq(Customer(1))).as("customers")
val invoices = spark.createDataset(Seq(Invoice(1, 1), Invoice(2, 1)))

case class CombinedCustomer(id: Int, invoices: Option[Seq[Invoice]])

customers
  .join(
    invoices
      .groupBy('customer_id)
      .agg(collect_list(struct('*)).as("invoices"))
      .withColumnRenamed("customer_id", "id"), 
    Seq("id"), "left_outer")
  .as[CombinedCustomer]
  .show

struct('*)从整行开始构建一个StructType列。您也可以选择任何列,例如struct('x.as("colA"), 'colB)

这产生了

+---+----------------+
| id|        invoices|
+---+----------------+
|  1|[[1, 1], [2, 1]]|
+---+----------------+

现在,在预期客户数据不适合内存的情况下,即使用简单的collect不是一种选择,您可以采取许多不同的策略。

最简单的,您应该考虑的而不是收集到驱动程序,要求对每个客户的数据进行独立处理是可以接受的。在这种情况下,请尝试使用map并将每个客户的处理逻辑分发给工作人员。

如果客户无法接受独立处理,则一般策略如下:

  1. 使用上述方法根据需要将数据聚合到结构化行中。
  2. 重新分区数据以确保处理所需的所有内容都在一个分区中。
  3. (可选)sortWithinPartitions以确保分区中的数据按您的需要排序。
  4. 使用mapPartitions

0
投票

您可以使用Spark-SQL,并为客户,发票和项目分别提供一个数据集。然后,您可以简单地在这些数据集之间使用连接和聚合函数来获得所需的输出。

Spark SQL在sql风格和编程方式之间具有非常微不足道的性能差异。

© www.soinside.com 2019 - 2024. All rights reserved.