使用Python从Google云端存储中逐行读取大量JSON

问题描述 投票:1回答:3

我知道我应该有一个代码,但我还没有任何用处。

我的GCS gs://path/listings_all.json上有~300GB的JSON文件,最终我试图将它导入BigQuery,但它有一些错误的数据结构(我是从MongoDB的mongoexport获取的)

无效的字段名称“$ date”。字段必须仅包含字母,数字和下划线,以字母或下划线开头,最多128个字符

所以,现在我的方法是以某种方式从GCS处理它逐行读取源文件,并使用python API将每个处理过的行上传到BigQuery。

下面简单的读者我已经把原始巨大文件中的100行样本放在一起进行测试:

import json
from pprint import pprint

with open('schema_in_10.json') as f:
    for line in f:
        j_content = json.loads(line)

        # print(j_content['id'], j_content['city'], j_content['country'], j_content['state'], j_content['country_code'], j_content['smart_location'], j_content['address'], j_content['market'], j_content['neighborhood'])
        # // geo { lat, lng}'])
        print('------')
        pprint(j_content['is_location_exact'])
        pprint(j_content['zipcode'])
        pprint(j_content['name'])

您能否帮我解决一下如何从Google Cloud Storage with Python3中逐行读取或流式传输巨大的JSON?

python google-bigquery google-cloud-storage google-python-api
3个回答
3
投票

逐行读取然后尝试流式传输到BigQuery将无法在本地计算机上使用300GB扩展,并且您将很难获得这个有效的TBH。

有几个可扩展的选项:

  1. 编写一个Cloud Dataflow管道来从GCS读取您的文件(它将为您缩放并并行读取),更正字段名称,然后写入BigQuery。见here
  2. 使用CSV而不是JSON作为格式并使用未出现在数据中的分隔符将其直接加载到BigQuery中。这会将每条记录加载到一个String列中,然后您可以使用BigQuery的JSON函数来提取您需要的内容。见here

1
投票

使用内置的json解析器逐行解析json文件是行不通的(除非它实际上是"json lines" doc),所以你想要一个streaming parser

但是虽然这将解决内存使用问题,但它不会修复无效的json,所以最好的办法是首先将无效的json源修复为纯文本文件,无论是在python中还是使用sed或类似的工具,然后使用增量解析器来解析您的内容。

def fixfile(sourcepath, destpath):
    with open(sourcepath) as source, open(destpath, "w") as dest:
        for line in source:
            # you may want to use a regexp if this simple solution
            # breaks something else
            line = line.replace("$date", "date")
            dest.write(line)

1
投票

以下是GCP Dataflow中与accepted answer中的第一个建议相对应的解决方案的示例实现。您需要在函数json_processor中实现json校正。您可以在Datalab笔记本中运行此代码。

# Datalab might need an older version of pip
# !pip install pip==9.0.3

import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions

project_id = 'my-project'
bigquery_dataset_name = 'testdataset' # needs to exist 
table_name = 'testtable'
bucket_name = 'my-bucket'
json_file_gcs_path = 'gs://path/to/my/file.json'
schema = "name:STRING,zipcode:STRING"

def json_processor(row):
    import json
    d = json.loads(row)
    return {'name': d['name'], 'zipcode': d['zipcode']}

options = beam.options.pipeline_options.PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = project_id
google_cloud_options.job_name = "myjob"
google_cloud_options.staging_location = 'gs://{}/binaries'.format(bucket_name)
google_cloud_options.temp_location = 'gs://{}/temp'.format(bucket_name)
options.view_as(StandardOptions).runner = 'DataflowRunner'
google_cloud_options.region = "europe-west1"

p = beam.Pipeline(options=options)

(p | "read_from_gcs" >> beam.io.ReadFromText(json_file_gcs_path)
   | "json_processor" >> beam.Map(json_processor)
   | "write_to_bq" >> beam.io.Write(beam.io.gcp.bigquery.BigQuerySink(table=table_name, 
                                                       dataset=bigquery_dataset_name, 
                                                       project=project_id, 
                                                       schema=schema, 
                                                       create_disposition='CREATE_IF_NEEDED',
                                                       write_disposition='WRITE_EMPTY'))
)

p.run()
© www.soinside.com 2019 - 2024. All rights reserved.