在数据流 2.x 中将 TableRow 转换为 JSON 格式字符串的最简单方法?

问题描述 投票:0回答:3

无需编写自己的函数来执行此操作,将数据流 2.x 管道内的

TableRow
对象转换为 JSON 格式的字符串的最简单方法是什么?

我认为下面的代码可以工作,但它没有正确在键/值之间插入引号,特别是在有嵌套字段的地方。

public static class TableRowToString extends DoFn<TableRow, String> {    
  private static final long serialVersionUID = 1L;

  @ProcessElement
    public void processElement(ProcessContext c) {
      c.output(c.element().toString());
    }
  }
}
java json apache-beam dataflow
3个回答
4
投票

使用

GSON
并执行
gson.toJson(yourTableRow)
详细信息此处


4
投票

我遇到了同样的问题,我通过使用 org.apache.beam.sdk.extensions.jackson.AsJsons.

解决了

使用它时,无需创建新的变换,可以直接将其应用到管道上。

import org.apache.beam.sdk.extensions.jackson.AsJsons;

Pipeline p = Pipeline.create(options);

p.apply("The transform that returns a PCollection of TableRow")
.apply("JSon Transform", AsJsons.of(TableRow.class));

如果您使用maven管理项目,您可以将其添加到

<dependencies>
文件中的
pom.xml

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-extensions-json-jackson</artifactId>
  <version>2.5.0</version>
  <scope>compile</scope>
</dependency>

0
投票

我正在尝试使用类似的设置来转换光束

Row
而不是
TableRow

如下:

PCollection<String> jsonStrings = result.apply("RowToJSON", AsJsons.of(Row.class));

但我更愿意得到的是完整的 JSON 对象;带有字段的架构

`{ “架构”:{ “编码位置”:{ “药物名称”:0, “开斋节”:1, “generic_id”:9, “患者ID”:4, “文档”:2, “治疗等级”:7, “通用名称”:6, “骄傲”:3, “创建时间”:10, “来源”:11, “更新的_ekaid”:5, “疾病名称”:8 }, “encodingPositionsOverridden”:假, “字段”:[ { “名称”:“药物名称”, “描述”: ””, “类型”: { “类型名称”:“字符串”, “可为空”:假, “逻辑类型”:空, “集合元素类型”:空, “mapKeyType”:空, “地图值类型”:空, “行架构”:空, “所有元数据”:{

      }
    },
    "options": {
      "optionNames": [
        
      ]
    }
  },
  {
    "name": "eid",
    "description": "",
    "type": {
      "typeName": "STRING",
      "nullable": false,
      "logicalType": null,
      "collectionElementType": null,
      "mapKeyType": null,
      "mapValueType": null,
      "rowSchema": null,
      "allMetadata": {
        
      }
    },
    "options": {
      "optionNames": [
        
      ]
    }
  },
  {
    "name": "docid",
    "description": "",
    "type": {
      "typeName": "STRING",
      "nullable": false,
      "logicalType": null,
      "collectionElementType": null,
      "mapKeyType": null,
      "mapValueType": null,
      "rowSchema": null,
      "allMetadata": {
        
      }
    },
    "options": {
      "optionNames": [
        
      ]
    }
  },
  {
    "name": "prid",
    "description": "",
    "type": {
      "typeName": "STRING",
      "nullable": false,
      "logicalType": null,
      "collectionElementType": null,
      "mapKeyType": null,
      "mapValueType": null,
      "rowSchema": null,
      "allMetadata": {
        
      }
    },
    "options": {
      "optionNames": [
        
      ]
    }
  },
  {
    "name": "patientid",
    "description": "",
    "type": {
      "typeName": "STRING",
      "nullable": false,
      "logicalType": null,
      "collectionElementType": null,
      "mapKeyType": null,
      "mapValueType": null,
      "rowSchema": null,
      "allMetadata": {
        
      }
    },
    "options": {
      "optionNames": [
        
      ]
    }
  },
  {
    "name": "updated_ekaid",
    "description": "",
    "type": {
      "typeName": "STRING",
      "nullable": true,
      "logicalType": null,
      "collectionElementType": null,
      "mapKeyType": null,
      "mapValueType": null,
      "rowSchema": null,
      "allMetadata": {
        
      }
    },
    "options": {
      "optionNames": [
        
      ]
    }
  },
  {
    "name": "generic_name",
    "description": "",
    "type": {
      "typeName": "STRING",
      "nullable": true,
      "logicalType": null,
      "collectionElementType": null,
      "mapKeyType": null,
      "mapValueType": null,
      "rowSchema": null,
      "allMetadata": {
        
      }
    },
    "options": {
      "optionNames": [
        
      ]
    }
  },
  {
    "name": "therapeutic_class",
    "description": "",
    "type": {
      "typeName": "STRING",
      "nullable": true,
      "logicalType": null,
      "collectionElementType": null,
      "mapKeyType": null,
      "mapValueType": null,
      "rowSchema": null,
      "allMetadata": {
        
      }
    },
    "options": {
      "optionNames": [
        
      ]
    }
  },
  {
    "name": "disease_name",
    "description": "",
    "type": {
      "typeName": "STRING",
      "nullable": true,
      "logicalType": null,
      "collectionElementType": null,
      "mapKeyType": null,
      "mapValueType": null,
      "rowSchema": null,
      "allMetadata": {
        
      }
    },
    "options": {
      "optionNames": [
        
      ]
    }
  },
  {
    "name": "generic_id",
    "description": "",
    "type": {
      "typeName": "STRING",
      "nullable": true,
      "logicalType": null,
      "collectionElementType": null,
      "mapKeyType": null,
      "mapValueType": null,
      "rowSchema": null,
      "allMetadata": {
        
      }
    },
    "options": {
      "optionNames": [
        
      ]
    }
  },
  {
    "name": "created_at",
    "description": "",
    "type": {
      "typeName": "STRING",
      "nullable": false,
      "logicalType": null,
      "collectionElementType": null,
      "mapKeyType": null,
      "mapValueType": null,
      "rowSchema": null,
      "allMetadata": {
        
      }
    },
    "options": {
      "optionNames": [
        
      ]
    }
  },
  {
    "name": "source",
    "description": "",
    "type": {
      "typeName": "LOGICAL_TYPE",
      "nullable": false,
      "logicalType": {
        "identifier": "SqlCharType",
        "argumentType": {
          "typeName": "STRING",
          "nullable": false,
          "logicalType": null,
          "collectionElementType": null,
          "mapKeyType": null,
          "mapValueType": null,
          "rowSchema": null,
          "allMetadata": {
            
          }
        },
        "argument": "",
        "baseType": {
          "typeName": "STRING",
          "nullable": false,
          "logicalType": null,
          "collectionElementType": null,
          "mapKeyType": null,
          "mapValueType": null,
          "rowSchema": null,
          "allMetadata": {
            
          }
        }
      },
      "collectionElementType": null,
      "mapKeyType": null,
      "mapValueType": null,
      "rowSchema": null,
      "allMetadata": {
        
      }
    },
    "options": {
      "optionNames": [
        
      ]
    }
  }
],
"uuid": null,
"options": {
  "optionNames": [
    
  ]
},
"fieldNames": [
  "drug_name",
  "eid",
  "docid",
  "prid",
  "patientid",
  "updated_ekaid",
  "generic_name",
  "therapeutic_class",
  "disease_name",
  "generic_id",
  "created_at",
  "source"
],
"fieldCount": 12

}, “价值观”:[ “药物名称”, “中”, “做过”, “骄傲”, “pid”, “出价”, “医学类别名称”, “医学课”, "['疾病名称']", “通用”, “2023年10月13日”, “当前药物” ], “字段计数”:12, “基本值”:[ “药物名称”, “中”, “做过”, “骄傲”, “pid”, “出价”, “医学类别名称”, “医学课”, "['疾病名称']", “通用”, “2023年10月13日”, “当前药物” ] }`

而我却在期待

{"drug_name": "drug_name", "eid": "mid", "docid": "did", "prid": "prid", "patientid": "pid", "updated_ekaid": "bid", "generic_name": "medicineclassname", "therapeutic_class": "medicine class", "disease_name": "[\'disease name\']", "generic_id": "genericid", "created_at": "2023-10-13", "source": "current_medications"}

如何以 JSON 形式实现所需的唯一值?

© www.soinside.com 2019 - 2024. All rights reserved.