将数据集中具有键值对数组的列转换为不同的行

问题描述 投票:0回答:1

我在dataframe中有数据,该数据是从azure eventhub获得的。然后,我将此数据转换为json对象,并将所需的数据存储到数据集中,如下所示。

用于从eventhub获取数据并将其存储到数据帧中的代码。

val connectionString = ConnectionStringBuilder(<ENDPOINT URL>)
    .setEventHubName(<EVENTHUB NAME>).build

val currTime = Instant.now
val ehConf = EventHubsConf(connectionString)
    .setConsumerGroup("<CONSUMER GRP>")
    .setStartingPosition(EventPosition
             .fromEnqueuedTime(currTime.minus(Duration.ofMinutes(30))))
    .setEndingPosition(EventPosition.fromEnqueuedTime(currTime))

val reader =  spark.read.format("eventhubs").options(ehConf.toMap).load()

var SIGNALS =  reader
    .select(get_json_object(($"body").cast("string"),"$.NUM").alias("NUM"),
            get_json_object(($"body").cast("string"),"$.SIG1").alias("SIG1"),
            get_json_object(($"body").cast("string"),"$.SIG2").alias("SIG2"),
            get_json_object(($"body").cast("string"),"$.SIG3").alias("SIG3"),
            get_json_object(($"body").cast("string"),"$.SIG4").alias("SIG4")
     )

val SIGNALSFiltered = SIGNALS.filter(col("SIG1").isNotNull &&
    col("SIG2").isNotNull && col("SIG3").isNotNull && col("SIG4").isNotNull)

SIGNALSFiltered处获得的数据如下所示。

+-----------------+--------------------+--------------------+--------------------+--------------------+
|              NUM|                SIG1|                SIG2|                SIG3|                SIG4|
+-----------------+--------------------+--------------------+--------------------+--------------------+
|XXXXX01|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX02|[{"TIME":15695604780...|[{"TIME":15695604780...|[{"TIME":15695604780...|[{"TIME":15695604780...|
|XXXXX03|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX04|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX05|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX06|[{"TIME":15695605340...|[{"TIME":15695605340...|[{"TIME":15695605340...|[{"TIME":15695605340...|
|XXXXX07|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX08|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|

如果我们检查单个行的全部数据,将如下所示。

|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825},{"TIME":1569560475000,"VALUE":3.7812},{"TIME":1569560483000,"VALUE":1.7812},{"TIME":1569560491000,"VALUE":7.7875}]|
    [{"TIME":1569560537000,"VALUE":3.7825},{"TIME":1569560481000,"VALUE":9.7825},{"TIME":1569560489000,"VALUE":5.7825},{"TIME":1569560497000,"VALUE":34.7825}]|
    [{"TIME":1569560505000,"VALUE":34.7825},{"TIME":1569560513000,"VALUE":9.7825},{"TIME":1569560521000,"VALUE":34.7825},{"TIME":1569560527000,"VALUE":4.7825}]|
    [{"TIME":1569560535000,"VALUE":7.7825},{"TIME":1569560479000,"VALUE":35.7825},{"TIME":1569560487000,"VALUE":3.7825}]

我想将每个信号列中的每个时间-值对转换为新行。

有什么方法可以如下转换基本数据集?。列中的每个元素都应转换为新行。

+-----------------+-----------------------------+---------------------------------------+-----------------------------+
|    NUM|    SIG1 TIME| SIG1 VALUE|    SIG2 TIME|   SIG2 VALUE|    SIG3 TIME|   SIG3 VALUE|    SIG4 TIME|  SIG4 VALUE |
+-----------------+-----------------------------+---------------------------------------+-----------------------------+
|XXXXX01|1569560531000|     3.7825|1569560531000|       4.7825|1569560531000|       8.7825|1569560531000|       2.7825|
|XXXXX01|1569560531000|     1.7825|1569560531000|       1.7825|        null |       null  |1569560531000|       2.7825|
|XXXXX01|1569560531000|     3.7825|1569560531000|       4.7825|1569560531000|       8.7825|1569560531000|       7.7825|
|XXXXX02|1569560531000|     7.7825|1569560531000|       4.7825|1569560531000|       8.7825|1569560531000|       2.7825|
|XXXXX02|null         |     null  |1569560531000|       5.7825|1569560531000|       7.7825|1569560531000|       5.7825|
|XXXXX02|1569560531000|     3.7825|1569560531000|       4.7825|1569560531000|       8.7825|1569560531000|       2.7825|
|XXXXX02|1569560531000|     5.7825|1569560531000|       7.7825|1569560531000|       9.7825|1569560531000|       2.7825|

感谢任何线索或帮助!在此先感谢。

dataframe apache-spark apache-spark-sql apache-spark-dataset
1个回答
0
投票

您可以使用explode功能进行操作。它将为数组中的每个元素生成新行,然后您可以使用点语法(访问结构的字段)访问字段timevalue。这是第一列的简单示例:

data
.withColumn("sig1_obj", explode($"SIG1"))
.withColumn("sig1_time", $"sig1_obj.time")
.withColumn("sig1_value", $"sig1_obj.value")
.show()

+--------------------+--------------------+-------------+----------+
|                SIG1|            sig1_obj|    sig1_time|sig1_value|
+--------------------+--------------------+-------------+----------+
|[[1569560531000, ...|[1569560531000, 3...|1569560531000|    3.7825|
|[[1569560531000, ...|[1569560475000, 3...|1569560475000|    3.7812|
|[[1569560531000, ...|[1569560483000, 1...|1569560483000|    1.7812|
|[[1569560531000, ...|[1569560491000, 7...|1569560491000|    7.7875|
+--------------------+--------------------+-------------+----------+

类似地,您也可以处理其他列。

[还请注意,使用此技术将要乘以数据,对于第二列,您将获得n*m行,其中n是sig1数组中的元素数,m是sig2数组中的元素数数组等等。如果您不希望这样做,则可以爆炸单独数据框中的每一列,然后在某些字段上将这些数据框完全外部连接(也许将row_number的行作为每个NUM的行,并在NUM col和row_number上进行连接)] >

© www.soinside.com 2019 - 2024. All rights reserved.