我需要一个databricks sql查询来分解数组列,然后根据数组中的值的数量转换为动态列数

问题描述 投票:0回答:2

我有一个 json 数据,其中位置是使用以下值导出的数组列

[“美国”、“中国”、“印度”、“英国”]

[“尼泊尔”、“中国”、“印度”、“英国”、“日本”]

我需要一个简单的 SQL 查询来分解数组列,然后根据数组中值的数量转换为动态数量的列。像这样的ivot(explode(from_json(jsondata:export.location, 'array'))) as loc_

选择 from_json(jsondata:export.location, 'array') AS

Location
枢轴(爆炸(from_json(jsondata:export.location,'数组')))作为loc_, 来自我的表

输入 |地点 | | -------- | | [“中国”、“印度”、“英国”] | | [“中国”、“印度”、“英国”、“日本”] |

输出

地点 loc_1 loc_2 loc_3 loc_4
[“中国”、“印度”、“英国”] “中国” “印度” “英国”
[“中国”、“印度”、“英国”、“日本”] “中国” “印度” “英国” “日本”
sql apache-spark-sql pivot-table databricks databricks-sql
2个回答
0
投票

您可以尝试应用

posexplode
和窗口来创建唯一标识符,该标识符可以将每个数组行划分为唯一元素。

# Define the data
data = [
    (["China", "India", "UK"],),
    (["China", "India", "UK", "Japan"],)
]

# Define the schema
schema = StructType([
    StructField("Location", ArrayType(StringType()))
])

# Create a DataFrame
df = spark.createDataFrame(data, schema=schema)
df.show(df.count(), False)

# Explode the 'Location' column
print("Explode the 'Location' column")
df = df.select(F.posexplode(F.col('Location')).alias('loc_', 'Location'))
df.show(df.count(), False)

# Create a column that identifies each list of locations using a pseudo-unique identifier
df = df.withColumn('id', F.when(F.col('loc_') == 0, F.monotonically_increasing_id()).otherwise(None))

# Forward fill the null values in the 'id' column using the -sys.maxsize window
window = Window.rowsBetween(-sys.maxsize, 0)
df = df.withColumn('id', F.last('id', ignorenulls=True).over(window))

# Recreate the 'Location' column
df = df.groupBy('id').agg(F.collect_list('Location').alias('Location'), F.max('loc_').alias('max_loc_'))

# Explode the 'Location' column and filter the DataFrame to give the values that have the location index 
# less than or equal to the maximum location index
df = df.select('id', 'Location', F.posexplode('Location').alias('loc_', 'exploded_Location'))
df = df.filter(F.col('loc_') <= F.col('max_loc_'))

# Pivot the DataFrame
df_pivot = df.groupBy('id', 'Location').pivot('loc_').agg(F.first('exploded_Location'))

# Show the pivoted DataFrame
df_pivot.show(df_pivot.count(), False)

这给出了以下结果:

+---+-------------------------+-----+-----+---+-----+
|id |Location                 |0    |1    |2  |3    |
+---+-------------------------+-----+-----+---+-----+
|0  |[China, India, UK]       |China|India|UK |null |
|1  |[China, India, UK, Japan]|China|India|UK |Japan|
+---+-------------------------+-----+-----+---+-----+

0
投票

您可以执行类似的操作,将数组列拆分为单独的列:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructField, StringType, ArrayType, StructType

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Split Array Column Example") \
    .master('local') \
    .getOrCreate()

# Define the data
data = [
    (["China", "India", "UK"],),
    (["China", "India", "UK", "Japan"],)
]

# Define the schema
schema = StructType([
    StructField("Location", ArrayType(StringType()))
])

# Create a DataFrame
df = spark.createDataFrame(data, schema=schema)

# Determine the maximum array length
max_length = df.selectExpr("max(size(Location))").collect()[0][0]

# Split the array column into individual columns
df_split = df.select('Location', *[
    col("Location")[i].alias(f"Location_{i+1}") 
    for i in range(max_length)
])

# Show the DataFrame with split columns
df_split.show()
© www.soinside.com 2019 - 2024. All rights reserved.