我知道我应该有一个代码,但我还没有任何用处。
我的GCS gs://path/listings_all.json
上有~300GB的JSON文件,最终我试图将它导入BigQuery,但它有一些错误的数据结构(我是从MongoDB的mongoexport
获取的)
无效的字段名称“$ date”。字段必须仅包含字母,数字和下划线,以字母或下划线开头,最多128个字符
所以,现在我的方法是以某种方式从GCS处理它逐行读取源文件,并使用python API将每个处理过的行上传到BigQuery。
下面简单的读者我已经把原始巨大文件中的100行样本放在一起进行测试:
import json
from pprint import pprint
with open('schema_in_10.json') as f:
for line in f:
j_content = json.loads(line)
# print(j_content['id'], j_content['city'], j_content['country'], j_content['state'], j_content['country_code'], j_content['smart_location'], j_content['address'], j_content['market'], j_content['neighborhood'])
# // geo { lat, lng}'])
print('------')
pprint(j_content['is_location_exact'])
pprint(j_content['zipcode'])
pprint(j_content['name'])
您能否帮我解决一下如何从Google Cloud Storage with Python3中逐行读取或流式传输巨大的JSON?
使用内置的json解析器逐行解析json文件是行不通的(除非它实际上是"json lines" doc),所以你想要一个streaming parser。
但是虽然这将解决内存使用问题,但它不会修复无效的json,所以最好的办法是首先将无效的json源修复为纯文本文件,无论是在python中还是使用sed
或类似的工具,然后使用增量解析器来解析您的内容。
def fixfile(sourcepath, destpath):
with open(sourcepath) as source, open(destpath, "w") as dest:
for line in source:
# you may want to use a regexp if this simple solution
# breaks something else
line = line.replace("$date", "date")
dest.write(line)
以下是GCP Dataflow中与accepted answer中的第一个建议相对应的解决方案的示例实现。您需要在函数json_processor中实现json校正。您可以在Datalab笔记本中运行此代码。
# Datalab might need an older version of pip
# !pip install pip==9.0.3
import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
project_id = 'my-project'
bigquery_dataset_name = 'testdataset' # needs to exist
table_name = 'testtable'
bucket_name = 'my-bucket'
json_file_gcs_path = 'gs://path/to/my/file.json'
schema = "name:STRING,zipcode:STRING"
def json_processor(row):
import json
d = json.loads(row)
return {'name': d['name'], 'zipcode': d['zipcode']}
options = beam.options.pipeline_options.PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = project_id
google_cloud_options.job_name = "myjob"
google_cloud_options.staging_location = 'gs://{}/binaries'.format(bucket_name)
google_cloud_options.temp_location = 'gs://{}/temp'.format(bucket_name)
options.view_as(StandardOptions).runner = 'DataflowRunner'
google_cloud_options.region = "europe-west1"
p = beam.Pipeline(options=options)
(p | "read_from_gcs" >> beam.io.ReadFromText(json_file_gcs_path)
| "json_processor" >> beam.Map(json_processor)
| "write_to_bq" >> beam.io.Write(beam.io.gcp.bigquery.BigQuerySink(table=table_name,
dataset=bigquery_dataset_name,
project=project_id,
schema=schema,
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_EMPTY'))
)
p.run()