如何在 pyspark 中转换这个流数据帧,
+--------------------+------+----------------------------------------------+
| timestamp|offset|stringdecode(value, UTF-8) |
+--------------------+------+----------------------------------------------+
|2023-03-03 17:21:...| 10| "[{"num":55,"cor":32},{"num":14,"cor":54}]" |
+--------------------+------+----------------------------------------------+
|2023-03-03 17:35:...| 11| "[{"num":55,"cor":98},{"num":32,"cor":77}]" |
+--------------------+------+----------------------------------------------+
进入这个
+--------------------+------+---+---+
| timestamp|offset|num|cor|
+--------------------+------+---+---+
|2023-03-03 17:21:...| 10| 55| 32|
+--------------------+------+---+---+
|2023-03-03 17:21:...| 10| 14| 54|
+--------------------+------+---+---+
|2023-03-03 17:35:...| 11| 55| 98|
+--------------------+------+---+---+
|2023-03-03 17:35:...| 11| 32| 77|
+--------------------+------+---+---+
stackoverflow 要求我添加文本来发布我的问题,但我认为没有任何必要,因此本段来解决问题
只需使用 from_json 并展开列
这行得通:
sch=ArrayType(StructType([
StructField("num", IntegerType()),
StructField("cor", IntegerType())
]))
df1.withColumn("asArray", F.from_json("dict", sch))\
.withColumn("asStruct", F.explode("asArray"))\
.select(*[col for col in df1.schema.names if col!="dict"], "asStruct.*")\
.show()
输入:
+-------------------+------+-----------------------------------------+
|timestamp |offset|dict |
+-------------------+------+-----------------------------------------+
|2023-03-03 00:00:00|10 |[{"num":55,"cor":32},{"num":14,"cor":54}]|
+-------------------+------+-----------------------------------------+
架构:
root
|-- timestamp: string (nullable = true)
|-- offset: string (nullable = true)
|-- dict: string (nullable = true)
输出:
+-------------------+------+---+---+
| timestamp|offset|num|cor|
+-------------------+------+---+---+
|2023-03-03 00:00:00| 10| 55| 32|
|2023-03-03 00:00:00| 10| 14| 54|
+-------------------+------+---+---+
如果您遇到任何问题,请告诉我。