无需编写自己的函数来执行此操作,将数据流 2.x 管道内的
TableRow
对象转换为 JSON 格式的字符串的最简单方法是什么?
我认为下面的代码可以工作,但它没有正确在键/值之间插入引号,特别是在有嵌套字段的地方。
public static class TableRowToString extends DoFn<TableRow, String> {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().toString());
}
}
}
我遇到了同样的问题,我通过使用 org.apache.beam.sdk.extensions.jackson.AsJsons.
解决了使用它时,无需创建新的变换,可以直接将其应用到管道上。
import org.apache.beam.sdk.extensions.jackson.AsJsons;
Pipeline p = Pipeline.create(options);
p.apply("The transform that returns a PCollection of TableRow")
.apply("JSon Transform", AsJsons.of(TableRow.class));
如果您使用maven管理项目,您可以将其添加到
<dependencies>
文件中的pom.xml
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-json-jackson</artifactId>
<version>2.5.0</version>
<scope>compile</scope>
</dependency>
我正在尝试使用类似的设置来转换光束
Row
而不是TableRow
如下:
PCollection<String> jsonStrings = result.apply("RowToJSON", AsJsons.of(Row.class));
但我更愿意得到的是完整的 JSON 对象;带有字段的架构
`{ “架构”:{ “编码位置”:{ “药物名称”:0, “开斋节”:1, “generic_id”:9, “患者ID”:4, “文档”:2, “治疗等级”:7, “通用名称”:6, “骄傲”:3, “创建时间”:10, “来源”:11, “更新的_ekaid”:5, “疾病名称”:8 }, “encodingPositionsOverridden”:假, “字段”:[ { “名称”:“药物名称”, “描述”: ””, “类型”: { “类型名称”:“字符串”, “可为空”:假, “逻辑类型”:空, “集合元素类型”:空, “mapKeyType”:空, “地图值类型”:空, “行架构”:空, “所有元数据”:{
}
},
"options": {
"optionNames": [
]
}
},
{
"name": "eid",
"description": "",
"type": {
"typeName": "STRING",
"nullable": false,
"logicalType": null,
"collectionElementType": null,
"mapKeyType": null,
"mapValueType": null,
"rowSchema": null,
"allMetadata": {
}
},
"options": {
"optionNames": [
]
}
},
{
"name": "docid",
"description": "",
"type": {
"typeName": "STRING",
"nullable": false,
"logicalType": null,
"collectionElementType": null,
"mapKeyType": null,
"mapValueType": null,
"rowSchema": null,
"allMetadata": {
}
},
"options": {
"optionNames": [
]
}
},
{
"name": "prid",
"description": "",
"type": {
"typeName": "STRING",
"nullable": false,
"logicalType": null,
"collectionElementType": null,
"mapKeyType": null,
"mapValueType": null,
"rowSchema": null,
"allMetadata": {
}
},
"options": {
"optionNames": [
]
}
},
{
"name": "patientid",
"description": "",
"type": {
"typeName": "STRING",
"nullable": false,
"logicalType": null,
"collectionElementType": null,
"mapKeyType": null,
"mapValueType": null,
"rowSchema": null,
"allMetadata": {
}
},
"options": {
"optionNames": [
]
}
},
{
"name": "updated_ekaid",
"description": "",
"type": {
"typeName": "STRING",
"nullable": true,
"logicalType": null,
"collectionElementType": null,
"mapKeyType": null,
"mapValueType": null,
"rowSchema": null,
"allMetadata": {
}
},
"options": {
"optionNames": [
]
}
},
{
"name": "generic_name",
"description": "",
"type": {
"typeName": "STRING",
"nullable": true,
"logicalType": null,
"collectionElementType": null,
"mapKeyType": null,
"mapValueType": null,
"rowSchema": null,
"allMetadata": {
}
},
"options": {
"optionNames": [
]
}
},
{
"name": "therapeutic_class",
"description": "",
"type": {
"typeName": "STRING",
"nullable": true,
"logicalType": null,
"collectionElementType": null,
"mapKeyType": null,
"mapValueType": null,
"rowSchema": null,
"allMetadata": {
}
},
"options": {
"optionNames": [
]
}
},
{
"name": "disease_name",
"description": "",
"type": {
"typeName": "STRING",
"nullable": true,
"logicalType": null,
"collectionElementType": null,
"mapKeyType": null,
"mapValueType": null,
"rowSchema": null,
"allMetadata": {
}
},
"options": {
"optionNames": [
]
}
},
{
"name": "generic_id",
"description": "",
"type": {
"typeName": "STRING",
"nullable": true,
"logicalType": null,
"collectionElementType": null,
"mapKeyType": null,
"mapValueType": null,
"rowSchema": null,
"allMetadata": {
}
},
"options": {
"optionNames": [
]
}
},
{
"name": "created_at",
"description": "",
"type": {
"typeName": "STRING",
"nullable": false,
"logicalType": null,
"collectionElementType": null,
"mapKeyType": null,
"mapValueType": null,
"rowSchema": null,
"allMetadata": {
}
},
"options": {
"optionNames": [
]
}
},
{
"name": "source",
"description": "",
"type": {
"typeName": "LOGICAL_TYPE",
"nullable": false,
"logicalType": {
"identifier": "SqlCharType",
"argumentType": {
"typeName": "STRING",
"nullable": false,
"logicalType": null,
"collectionElementType": null,
"mapKeyType": null,
"mapValueType": null,
"rowSchema": null,
"allMetadata": {
}
},
"argument": "",
"baseType": {
"typeName": "STRING",
"nullable": false,
"logicalType": null,
"collectionElementType": null,
"mapKeyType": null,
"mapValueType": null,
"rowSchema": null,
"allMetadata": {
}
}
},
"collectionElementType": null,
"mapKeyType": null,
"mapValueType": null,
"rowSchema": null,
"allMetadata": {
}
},
"options": {
"optionNames": [
]
}
}
],
"uuid": null,
"options": {
"optionNames": [
]
},
"fieldNames": [
"drug_name",
"eid",
"docid",
"prid",
"patientid",
"updated_ekaid",
"generic_name",
"therapeutic_class",
"disease_name",
"generic_id",
"created_at",
"source"
],
"fieldCount": 12
}, “价值观”:[ “药物名称”, “中”, “做过”, “骄傲”, “pid”, “出价”, “医学类别名称”, “医学课”, "['疾病名称']", “通用”, “2023年10月13日”, “当前药物” ], “字段计数”:12, “基本值”:[ “药物名称”, “中”, “做过”, “骄傲”, “pid”, “出价”, “医学类别名称”, “医学课”, "['疾病名称']", “通用”, “2023年10月13日”, “当前药物” ] }`
而我却在期待
{"drug_name": "drug_name", "eid": "mid", "docid": "did", "prid": "prid", "patientid": "pid", "updated_ekaid": "bid", "generic_name": "medicineclassname", "therapeutic_class": "medicine class", "disease_name": "[\'disease name\']", "generic_id": "genericid", "created_at": "2023-10-13", "source": "current_medications"}
如何以 JSON 形式实现所需的唯一值?