如何在scala中从JSON生成合并的值列表?

问题描述 投票:0回答:1

在json文件中输入数据:

[{
    "Orders": {
        "orderid": {
            "path": "order_id",
            "type": "CHAR",
            "is_explode_required": "N"
        },
        "customerId": {
            "path": "customers.customerId",
            "type": "CHAR",
            "is_explode_required": "N"
        },
        "offerid": {
            "path": "Offers.Offerid",
            "type": "LIST",
            "is_explode_required": "Y"
        }
    },
    "products": {
        "productid": {
            "path": "product_id",
            "type": "CHAR",
            "is_explode_required": "N"
        },
        "productName": {
            "path": "products.productname",
            "type": "CHAR",
            "is_explode_required": "N"
        }
    }
}]

在此输出中,对于订单,我想获取两列第一列,其中所有值均由逗号连接 另一列的值中提到了 is_explore_required Y

output| exlode_output
order_id,customers.customerId,Offers.Offerid|Offers.Offerid
scala apache-spark
1个回答
0
投票

检查下面的代码。

df.show(false)
+---------------------------------------------------------------------------------+--------------------------------------------------------+
|Orders                                                                           |products                                                |
+---------------------------------------------------------------------------------+--------------------------------------------------------+
|{{N, customers.customerId, CHAR}, {Y, Offers.Offerid, LIST}, {N, order_id, CHAR}}|{{N, products.productname, CHAR}, {N, product_id, CHAR}}|
+---------------------------------------------------------------------------------+--------------------------------------------------------+
val columName = "Orders".toLowerCase
val outputExpr = s"""
    concat_ws(
        ',', 
        transform(
            map_entries(
                ${columName}), 
                e -> e.value.path
            )
        ) AS output
    """
val explodeOutputExpr = s"""
    concat_ws(
        ',', 
        transform(
            map_entries(
                map_filter(
                    ${columName}, 
                    (k,v) -> v.is_explode_required == 'Y'
                )
            ), 
            e -> e.value.path
        )
    ) AS exlode_output
    """
val schema = "map<string, struct<is_explode_required:string,path:string,type:string>>"
df
.withColumn(
  columName, 
  expr(
   s"""
     from_json(
        to_json(${columName}), 
        '${schema}'
     )
    """
  )
)
.selectExpr(
   outputExpr, 
   explodeOutputExpr
)
.show(false)
+--------------------------------------------+--------------+
|output                                      |exlode_output |
+--------------------------------------------+--------------+
|customers.customerId,Offers.Offerid,order_id|Offers.Offerid|
+--------------------------------------------+--------------+
© www.soinside.com 2019 - 2024. All rights reserved.