将列转换为 Azure Dataflow 中另一列的 JSON 对象

问题描述 投票:0回答:1

我有以下格式的数据,我正在使用数据流以 JSON 格式格式化记录并将其存储到数据的另一列中。

Input

想要使用 Dataflow 转换为以下格式:

Output Format Required

我没有任何方法可以使用数据流来转换它

azure azure-data-factory google-cloud-dataflow
1个回答
0
投票
  • 您可以使用派生列转换来实现这一点。我已将以下内容作为我的来源。

enter image description here

  • 现在,使用
    associate
    函数分别制作键值对,以使用派生列转换创建 2 个新列。
A: associate(CUST_ID_A,{SCORE A})
B: associate(CUST_ID_B,{SCORE B})

enter image description here

  • 现在,使用新创建的 so 列创建一个数组
    array(A,B)
    .

enter image description here

  • 现在,在接收器中,我选择一个 JSON 接收器文件并仅映射所需的列,如下所示:

enter image description here

  • 这将给出最终数据预览,如下图所示。

enter image description here

  • 以下是完整的数据流JSON:
{
    "name": "dataflow1",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "DelimitedText1",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "Json1",
                        "type": "DatasetReference"
                    },
                    "name": "sink1"
                }
            ],
            "transformations": [
                {
                    "name": "derivedColumn1"
                },
                {
                    "name": "derivedColumn2"
                }
            ],
            "scriptLines": [
                "source(output(",
                "          TRANS_ID as string,",
                "          CUST_ID_A as string,",
                "          {SCORE A} as string,",
                "          CUST_ID_B as string,",
                "          {SCORE B} as string",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     ignoreNoFilesFound: false) ~> source1",
                "source1 derive(A = associate(CUST_ID_A,{SCORE A}),",
                "          B = associate(CUST_ID_B,{SCORE B})) ~> derivedColumn1",
                "derivedColumn1 derive(cust_conf = array(A,B)) ~> derivedColumn2",
                "derivedColumn2 sink(allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     partitionFileNames:['op.json'],",
                "     umask: 0022,",
                "     preCommands: [],",
                "     postCommands: [],",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     mapColumn(",
                "          TRANS_ID,",
                "          cust_conf",
                "     ),",
                "     partitionBy('hash', 1)) ~> sink1"
            ]
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.