我有一个数据流管道,它从 MySQL 读取数据并将其写入 BigQuery。管道失败,问题是从 MySQL 读取的数据格式不是 NEWLINE DELIMITED JSON。
MySQL rows - using various ways to convert to JSON
1) JSON_OBJECT in Sql query gives below o/p
[('{"emp_id": "1001", "dept_nm": "marketing"}',), ('{"emp_id": "1002", "dept_nm": "consulting"}',), ('{"emp_id": "1003", "dept_nm": "delivery"}',)]
2) json.dumps(json_object) gives output as {"emp_id": ["1001", "marketing"], "dept_nm": ["1002", "consulting"]}
BigQuery expects data in the below NEWLINE DELIMITED JSON format -
{"emp_id": "1001", "dept_nm": "marketing"}
{"emp_id": "1002", "dept_nm": "consulting"}
{"emp_id": "1003", "dept_nm": "delivery"}
我不知道如何更改格式以加载到 BQ 中。
此解决方法可能对您有帮助。
json.loads
函数将JSON的字符串表示形式转换为Python字典,然后使用json.dumps
将它们序列化为所需的格式(NDJSON)。 join 方法用于将格式化的行与中间的换行符连接起来。
然后,您可以使用生成的 ndjson_data 加载到 BigQuery 中。根据您的特定管道,您可以将此数据写入文件或直接在管道中使用它来加载到 BigQuery 中。
这是我的Python示例代码:
import json
# Example data from MySQL
mysql_rows = [('{"emp_id": "1001", "dept_nm": "marketing"}',),
('{"emp_id": "1002", "dept_nm": "consulting"}',),
('{"emp_id": "1003", "dept_nm": "delivery"}',)]
# Convert MySQL rows to BigQuery format
bq_rows = [json.loads(row[0]) for row in mysql_rows]
# Serialize each dictionary into NEWLINE DELIMITED JSON format
ndjson_data = '\n'.join([json.dumps(row) for row in bq_rows])
# Print or use ndjson_data for further processing, such as loading into BigQuery
print(ndjson_data)