我正在 Azure 数据工厂中使用复制活动。 Source 调用 API 请求并返回 JSON 响应。不同API调用的用户id和用户数量不同。
JSON 响应示例:
{
"rates": {
"account-default": "0.00",
"project-default": "0.00",
"users": {
"5405": {
"defaultRate": "0.00",
"rate": "0.00",
"source": "direct"
},
"5378": {
"defaultRate": "33.00",
"rate": "33.00",
"source": "direct"
},
"5454": {
"defaultRate": "0.00",
"rate": "0.00",
"source": "direct"
},
"5447": {
"defaultRate": "0.00",
"rate": "0.00",
"source": "direct"
}
}
},
"STATUS": "OK"
}
我想将其直接作为 parquet 保存“user_id”和“rate”信息。这里的问题是用户 ID(5405、5378、5454、5447)是 JSON 响应中的键,这妨碍了我在映射中使用集合引用。
这里的核心问题是我不想作为 JSON 接收然后使用 JSON 或稍后将其转换为镶木地板。我想直接把它沉成实木复合地板。
有人有什么想法吗?
我尝试在映射中使用集合引用,但无法实现。
由于 JSON 中的键是动态的,使用复制活动可能无法实现您的要求。
您可以尝试使用 ADF Dataflow 从上述 JSON 中获取所需的输出。
使用您的 API 请求设置源,导入投影并按照以下步骤操作。
在源之后,进行
Select
转换,并使用基于规则的映射将 keys
作为列从 users
中提取,如下所示。
然后,进行
derivedColumn
转换并使用具有以下表达式的列模式。
toDecimal(split(split(replace(toString($$),' ',''),'"rate":"')[2],'"')[1])
这会将动态键作为列,并将其各自的
rate
作为行值。
现在,从该转换中取出一个分支,并使用
衍生列转换和以下表达式创建一个
cols_arr
数组。
map(columnNames(),toInteger(#item))
在 other 分支中,使用以下表达式的
illary转换创建一个
rates_arr
数组。
map(split(concatWS(',', toString(columns())),','),toDecimal(#item))
对这些使用
Select
转换以仅获取所需的数组列。
对rates_arr
列的
其他分支执行相同操作。
然后,对这些使用
flatten
变换来展平数组列。
同样,对rates_arr
列的
其他分支进行同样的展平。
现在,两者都将具有所需的列。因此,我们需要组合这些列。为此,请对两者进行
SurrogateKey
转换,并为两者创建一个 key
列(键名称应该相同)。
也为另一个分支添加
SurrogateKey
转换。
然后,使用两个key
列连接这两个转换,如下所示。
z
使用选择转换删除之后额外的
key
列,它将给出如下所示的所需输出。
此后,使用目标镶木地板数据集添加接收器转换并从管道执行数据流。预期数据将被复制到 parquet 文件中。
这是数据流代码供您参考:
source(output(
rates as ({account-default} as string, {project-default} as string, users as ({5378} as (defaultRate as string, rate as string, source as string), {5405} as (defaultRate as string, rate as string, source as string), {5447} as (defaultRate as string, rate as string, source as string), {5454} as (defaultRate as string, rate as string, source as string))),
STATUS as string
),
allowSchemaDrift: true,
validateSchema: false,
ignoreNoFilesFound: false,
documentForm: 'singleDocument') ~> source1
source1 select(mapColumn(
each(rates.users,match(true()))
),
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> select1
select1 derive(each(match(true()), $$ = toDecimal(split(split(replace(toString($$),' ',''),'"rate":"')[2],'"')[1]))) ~> GetColsAndRates
GetColsAndRates derive(rates_arr = map(split(concatWS(',', toString(columns())),','),toDecimal(#item))) ~> RatesArray
GetColsAndRates derive(cols_arr = map(columnNames(),toInteger(#item))) ~> ColumnsArray
ColumnsArray select(mapColumn(
each(match(name=='cols_arr'))
),
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> selectColsArr
RatesArray select(mapColumn(
each(match(name=='rates_arr'))
),
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> selectRatesArr
flatten1 keyGenerate(output(key as long),
startAt: 1L,
stepValue: 1L) ~> surrogateKey1
selectColsArr foldDown(unroll(cols_arr),
mapColumn(
cols_arr
),
skipDuplicateMapInputs: false,
skipDuplicateMapOutputs: false) ~> flatten1
selectRatesArr foldDown(unroll(rates_arr),
mapColumn(
rates_arr
),
skipDuplicateMapInputs: false,
skipDuplicateMapOutputs: false) ~> flatten2
flatten2 keyGenerate(output(key as long),
startAt: 1L,
stepValue: 1L) ~> surrogateKey2
surrogateKey1, surrogateKey2 join(surrogateKey1@key == surrogateKey2@key,
joinType:'inner',
matchType:'exact',
ignoreSpaces: false,
broadcast: 'auto')~> join1
join1 select(mapColumn(
cols_arr,
rates_arr
),
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> select2