MySQL JSON_OBJECT 不会生成写入 BigQyery 所需的换行符分隔的 json

问题描述 投票:0回答:1

我有一个数据流管道,它从 MySQL 读取数据并将其写入 BigQuery。管道失败,问题是从 MySQL 读取的数据格式不是 NEWLINE DELIMITED JSON。

    MySQL rows - using various ways to convert to JSON

1) JSON_OBJECT in Sql query gives below o/p
    [('{"emp_id": "1001", "dept_nm": "marketing"}',), ('{"emp_id": "1002", "dept_nm": "consulting"}',), ('{"emp_id": "1003", "dept_nm": "delivery"}',)]

2) json.dumps(json_object) gives output as {"emp_id": ["1001", "marketing"], "dept_nm": ["1002", "consulting"]} 
    
    BigQuery expects data in the below NEWLINE DELIMITED JSON format - 

    {"emp_id": "1001", "dept_nm": "marketing"}
    {"emp_id": "1002", "dept_nm": "consulting"}
    {"emp_id": "1003", "dept_nm": "delivery"}

我不知道如何更改格式以加载到 BQ 中。

google-cloud-dataflow
1个回答
0
投票

此解决方法可能对您有帮助。

您可以使用Python中的

json.loads
函数将JSON的字符串表示形式转换为Python字典,然后使用
json.dumps
将它们序列化为所需的格式(NDJSON)。 join 方法用于将格式化的行与中间的换行符连接起来。

然后,您可以使用生成的 ndjson_data 加载到 BigQuery 中。根据您的特定管道,您可以将此数据写入文件或直接在管道中使用它来加载到 BigQuery 中。

这是我的Python示例代码:

import json

# Example data from MySQL
mysql_rows = [('{"emp_id": "1001", "dept_nm": "marketing"}',),
              ('{"emp_id": "1002", "dept_nm": "consulting"}',),
              ('{"emp_id": "1003", "dept_nm": "delivery"}',)]

# Convert MySQL rows to BigQuery format
bq_rows = [json.loads(row[0]) for row in mysql_rows]

# Serialize each dictionary into NEWLINE DELIMITED JSON format
ndjson_data = '\n'.join([json.dumps(row) for row in bq_rows])

# Print or use ndjson_data for further processing, such as loading into BigQuery
print(ndjson_data)
© www.soinside.com 2019 - 2024. All rights reserved.