当 JSON 源将所需值作为键时,Azure 数据工厂接收器作为镶木地板

问题描述 投票:0回答:1

我正在 Azure 数据工厂中使用复制活动。 Source 调用 API 请求并返回 JSON 响应。不同API调用的用户id和用户数量不同。

JSON 响应示例:

{
    "rates": {
        "account-default": "0.00",
        "project-default": "0.00",
        "users": {
            "5405": {
                "defaultRate": "0.00",
                "rate": "0.00",
                "source": "direct"
            },
            "5378": {
                "defaultRate": "33.00",
                "rate": "33.00",
                "source": "direct"
            },
            "5454": {
                "defaultRate": "0.00",
                "rate": "0.00",
                "source": "direct"
            },
            "5447": {
                "defaultRate": "0.00",
                "rate": "0.00",
                "source": "direct"
            }
        }
    },
    "STATUS": "OK"
}

我想将其直接作为 parquet 保存“user_id”和“rate”信息。这里的问题是用户 ID(5405、5378、5454、5447)是 JSON 响应中的键,这妨碍了我在映射中使用集合引用。

screenshot of imported schema in ADF

这里的核心问题是我不想作为 JSON 接收然后使用 JSON 或稍后将其转换为镶木地板。我想直接把它沉成实木复合地板。

有人有什么想法吗?

我尝试在映射中使用集合引用,但无法实现。

json azure-data-factory parquet data-engineering
1个回答
0
投票

由于 JSON 中的键是动态的,使用复制活动可能无法实现您的要求。

您可以尝试使用 ADF Dataflow 从上述 JSON 中获取所需的输出。

使用您的 API 请求设置源,导入投影并按照以下步骤操作。

在源之后,进行

Select
转换,并使用基于规则的映射将
keys
作为列从
users
中提取,如下所示。

enter image description here

然后,进行

derivedColumn
转换并使用具有以下表达式的列模式。

toDecimal(split(split(replace(toString($$),' ',''),'"rate":"')[2],'"')[1])

enter image description here

这会将动态键作为列,并将其各自的

rate
作为行值。

enter image description here

现在,从该转换中取出一个分支,并使用

衍生列
转换和以下表达式创建一个 cols_arr 数组。

map(columnNames(),toInteger(#item))

enter image description here

other 分支中,使用以下表达式的

illary
转换创建一个 rates_arr 数组。

map(split(concatWS(',', toString(columns())),','),toDecimal(#item))

enter image description here

对这些使用

Select
转换以仅获取所需的数组列。

enter image description here

rates_arr列的

其他分支
执行相同操作。

然后,对这些使用

flatten
变换来展平数组列。

enter image description here

同样,对rates_arr列的

其他分支
进行同样的展平。

现在,两者都将具有所需的列。因此,我们需要组合这些列。为此,请对两者进行

SurrogateKey
转换,并为两者创建一个
key
列(键名称应该相同)。

enter image description here

也为另一个分支添加

SurrogateKey
转换。

然后,使用两个key

列连接这两个转换
,如下所示。

enter image description herez

使用选择转换删除之后额外的

key
列,它将给出如下所示的所需输出。

enter image description here

此后,使用目标镶木地板数据集添加接收器转换并从管道执行数据流。预期数据将被复制到 parquet 文件中。

这是数据流代码供您参考:

source(output(
        rates as ({account-default} as string, {project-default} as string, users as ({5378} as (defaultRate as string, rate as string, source as string), {5405} as (defaultRate as string, rate as string, source as string), {5447} as (defaultRate as string, rate as string, source as string), {5454} as (defaultRate as string, rate as string, source as string))),
        STATUS as string
    ),
    allowSchemaDrift: true,
    validateSchema: false,
    ignoreNoFilesFound: false,
    documentForm: 'singleDocument') ~> source1
source1 select(mapColumn(
        each(rates.users,match(true()))
    ),
    skipDuplicateMapInputs: true,
    skipDuplicateMapOutputs: true) ~> select1
select1 derive(each(match(true()), $$ = toDecimal(split(split(replace(toString($$),' ',''),'"rate":"')[2],'"')[1]))) ~> GetColsAndRates
GetColsAndRates derive(rates_arr = map(split(concatWS(',', toString(columns())),','),toDecimal(#item))) ~> RatesArray
GetColsAndRates derive(cols_arr = map(columnNames(),toInteger(#item))) ~> ColumnsArray
ColumnsArray select(mapColumn(
        each(match(name=='cols_arr'))
    ),
    skipDuplicateMapInputs: true,
    skipDuplicateMapOutputs: true) ~> selectColsArr
RatesArray select(mapColumn(
        each(match(name=='rates_arr'))
    ),
    skipDuplicateMapInputs: true,
    skipDuplicateMapOutputs: true) ~> selectRatesArr
flatten1 keyGenerate(output(key as long),
    startAt: 1L,
    stepValue: 1L) ~> surrogateKey1
selectColsArr foldDown(unroll(cols_arr),
    mapColumn(
        cols_arr
    ),
    skipDuplicateMapInputs: false,
    skipDuplicateMapOutputs: false) ~> flatten1
selectRatesArr foldDown(unroll(rates_arr),
    mapColumn(
        rates_arr
    ),
    skipDuplicateMapInputs: false,
    skipDuplicateMapOutputs: false) ~> flatten2
flatten2 keyGenerate(output(key as long),
    startAt: 1L,
    stepValue: 1L) ~> surrogateKey2
surrogateKey1, surrogateKey2 join(surrogateKey1@key == surrogateKey2@key,
    joinType:'inner',
    matchType:'exact',
    ignoreSpaces: false,
    broadcast: 'auto')~> join1
join1 select(mapColumn(
        cols_arr,
        rates_arr
    ),
    skipDuplicateMapInputs: true,
    skipDuplicateMapOutputs: true) ~> select2
© www.soinside.com 2019 - 2024. All rights reserved.