我正在调用一个API,它返回大约100,000行的JSON对象。我认为最好的办法是分批加载数据,因为每个请求的BigQuery限制为要插入10,000行,但不确定如何处理大量数据。
下面是我拥有的脚本,该脚本可插入<10,000行,但是我将如何重写此脚本以接受更多内容。
def export_items_to_bigquery(self):
project_id = 'data-analysis'
#credential from service account
credentials = service_account.Credentials.from_service_account_file(
self.filepath, scopes= ['https://www.googleapis.com/auth/bigquery'])
# Instantiates a client
bigquery_client = bq.Client(project = credentials.project_id, credentials = credentials)
# Prepares a reference to the dataset
table_id = '{}.{}.{}'.format(self.project_id, self.dataset_id, self.table)
table = bigquery_client.get_table(table_id)
#new_json_list is api data
row_to_insert = new_json_list
errors = bigquery_client.insert_rows(table, row_to_insert) # API request
if errors == []:
print(row_to_insert)
else:
print(errors)
而不是像所介绍的那样以流模式插入行,请使用加载作业。加载作业将毫无问题地读取数十亿行。
https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv
dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.schema = [
bigquery.SchemaField("name", "STRING"),
bigquery.SchemaField("post_abbr", "STRING"),
]
job_config.skip_leading_rows = 1
# The source format defaults to CSV, so the line below is optional.
job_config.source_format = bigquery.SourceFormat.CSV
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv"
load_job = client.load_table_from_uri(
uri, dataset_ref.table("us_states"), job_config=job_config
) # API request
print("Starting job {}".format(load_job.job_id))
load_job.result() # Waits for table load to complete.
print("Job finished.")