火花将多行转换为具有多个集合的单行

问题描述 投票:0回答:2

我正在寻找有关如何解决以下情况的想法。我的用例在java spark中,但是在我用尽所有想法的情况下,无论语言如何,都在寻找有关如何做的想法]

我具有如下非结构化数据

98480|PERSON|TOM|GREER|1982|12|27
98480|PHONE|CELL|732|201|6789
98480|PHONE|HOME|732|123|9876
98480|ADDR|RES|102|JFK BLVD|PISCATAWAY|NJ|08854
98480|ADDR|OFF|211|EXCHANGE PL|JERSEY CITY|NJ|07302
98481|PERSON|LIN|JASSOY|1976|09|15
98481|PHONE|CELL|908|398|3389
98481|PHONE|HOME|917|363|2647
98481|ADDR|RES|111|JOURNAL SQ|JERSEY CITY|NJ|07704
98481|ADDR|OFF|365|DOWNTOWN NEWYORK|NEWYORK CITY|NY|10001

我正在尝试将它们转换为带有persondata的行,并带有一组电话和addr,如下所示,每个personId基本上都是一行

+--------+------+---------+--------+----+-----+---+--------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------+
|personId|type  |firstName|lastName|year|month|day|Phone                                                                | addr                                                                                                                 |                                                                                                                                                               |
+--------+------+---------+--------+----+-----+---+--------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------+
|98481   |PERSON|LIN      |JASSOY  |1976|09   |15 |[[PHONE, HOME, 917, 363, 2647], [PHONE, CELL, 908, 398, 3389]]       | [[ADDR, OFF, 365, DOWNTOWN NEWYORK, NEWYORK CITY, NY, 10001], [ADDR, RES, 111, JOURNAL SQ, JERSEY CITY, NJ, 07704]]  |
|98480   |PERSON|TOM      |GREER   |1982|12   |27 |[[PHONE, HOME, 732, 123, 9876], [PHONE, CELL, 732, 201, 6789]]       | [[ADDR, RES, 102, JFK BLVD, PISCATAWAY, NJ, 08854], [ADDR, OFF, 211, EXCHANGE PL, JERSEY CITY, NJ, 07302]]           |
+--------+------+---------+--------+----+-----+---+--------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------+

带有下面的代码

Dataset<Row> dataset = groupedDataset
                .agg(collect_set(struct(phoneRow.col("type").as("collType"), phoneRow.col("phoneType").as("phoneType"),
                        phoneRow.col("areaCode").as("areaCode"), phoneRow.col("phoneMiddle").as("phoneMiddle"),
                        phoneRow.col("ext").as("ext"), addressRow.col("type").as("collType"),
                        addressRow.col("addrType").as("addrType"), addressRow.col("addr1").as("rowType"),
                        addressRow.col("addr2").as("addr2"), addressRow.col("city").as("city"),
                        addressRow.col("state").as("state"), addressRow.col("zipCode").as("zipCode"))).as("addrPhone"));

输出如下,但不是我想要的格式

+--------+------+---------+--------+----+-----+---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|personId|type  |firstName|lastName|year|month|day|addrPhone                                                                                                                                                                                                                                                                                                                                                 |
+--------+------+---------+--------+----+-----+---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|98481   |PERSON|LIN      |JASSOY  |1976|09   |15 |[[PHONE, HOME, 917, 363, 2647, ADDR, OFF, 365, DOWNTOWN NEWYORK, NEWYORK CITY, NY, 10001], [PHONE, HOME, 917, 363, 2647, ADDR, RES, 111, JOURNAL SQ, JERSEY CITY, NJ, 07704], [PHONE, CELL, 908, 398, 3389, ADDR, RES, 111, JOURNAL SQ, JERSEY CITY, NJ, 07704], [PHONE, CELL, 908, 398, 3389, ADDR, OFF, 365, DOWNTOWN NEWYORK, NEWYORK CITY, NY, 10001]]|
|98480   |PERSON|TOM      |GREER   |1982|12   |27 |[[PHONE, HOME, 732, 123, 9876, ADDR, RES, 102, JFK BLVD, PISCATAWAY, NJ, 08854], [PHONE, CELL, 732, 201, 6789, ADDR, RES, 102, JFK BLVD, PISCATAWAY, NJ, 08854], [PHONE, CELL, 732, 201, 6789, ADDR, OFF, 211, EXCHANGE PL, JERSEY CITY, NJ, 07302], [PHONE, HOME, 732, 123, 9876, ADDR, OFF, 211, EXCHANGE PL, JERSEY CITY, NJ, 07302]]                  |
+--------+------+---------+--------+----+-----+---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

寻找解决上述问题的想法

更新:

我能够获得预期的输出,但是我不确定它的效果如何,并且看起来好像有很多带有大量联接和数据帧的样板代码。这是我用来理解火花的示例数据,但是我将要使用的实际数据将进行很多复杂的转换,并且此代码看起来无效

这里是更新的代码

Dataset<Row> groupedPhoneDataSet = groupedDataset.agg(collect_set(struct(phoneRow.col("type").as("phColType"),
                phoneRow.col("phoneType").as("phoneType"), phoneRow.col("areaCode").as("areaCode"),
                phoneRow.col("phoneMiddle").as("phoneMiddle"), phoneRow.col("ext").as("ext"))).as("phoneRec"));

        Dataset<Row> groupedAddrDataSet = groupedDataset
                .agg(collect_set(struct(addressRow.col("type").as("addrColType"),
                        addressRow.col("addrType").as("addrType"), addressRow.col("addr1").as("addr1"),
                        addressRow.col("addr2").as("addr2"), addressRow.col("city").as("city"),
                        addressRow.col("state").as("state"), addressRow.col("zipCode").as("zipCode"))).as("addrRec"));

        Dataset<Row> finalDataSet = groupedAddrDataSet
                .join(groupedPhoneDataSet,
                        groupedAddrDataSet.col("personId").equalTo(groupedPhoneDataSet.col("personId")))
                .select(groupedPhoneDataSet.col("personId"), groupedPhoneDataSet.col("type"),
                        groupedPhoneDataSet.col("firstName"), groupedPhoneDataSet.col("lastName"),
                        groupedPhoneDataSet.col("year"), groupedPhoneDataSet.col("month"),
                        groupedPhoneDataSet.col("day"), col("phoneRec"), col("addrRec"));

这里是我得到的输出

+--------+------+---------+--------+----+-----+---+--------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------+
|personId|type  |firstName|lastName|year|month|day|phoneRec                                                      |addrRec                                                                                                            |
+--------+------+---------+--------+----+-----+---+--------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------+
|98481   |PERSON|LIN      |JASSOY  |1976|09   |15 |[[PHONE, CELL, 908, 398, 3389], [PHONE, HOME, 917, 363, 2647]]|[[ADDR, RES, 111, JOURNAL SQ, JERSEY CITY, NJ, 07704], [ADDR, OFF, 365, DOWNTOWN NEWYORK, NEWYORK CITY, NY, 10001]]|
|98480   |PERSON|TOM      |GREER   |1982|12   |27 |[[PHONE, CELL, 732, 201, 6789], [PHONE, HOME, 732, 123, 9876]]|[[ADDR, OFF, 211, EXCHANGE PL, JERSEY CITY, NJ, 07302], [ADDR, RES, 102, JFK BLVD, PISCATAWAY, NJ, 08854]]         |
+--------+------+---------+--------+----+-----+---+--------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------+

有没有一种方法可以在不创建大量数据帧的情况下做到这一点?>

我正在寻找有关如何解决以下情况的想法。我的用例在java spark中,但是由于我用完了构想,我一直在寻找与语言无关的想法,因为我有非结构化数据,如...

dataframe apache-spark pyspark apache-spark-sql pyspark-sql
2个回答
0
投票

如果您可以创建多个数据帧,将每种类型的记录分成不同的数据框,并按personId分组,将有关人员ID的所有三个数据框合并。

找到下面尝试的代码,让我知道它是否解决了您的问题。


0
投票

IIUC,您可以以行模式读取数据,进行一些数据处理,然后使用collect_list

© www.soinside.com 2019 - 2024. All rights reserved.