如何直接将查询结果写入Google云端存储桶?

问题描述 投票:1回答:3
from google.cloud import bigquery  
query = """ select * from emp where emp_name=@emp_name""" 
query_params = [bigquery.ScalarQueryParameter('emp_name', 'STRING', 'name')] 
job_config = bigquery.QueryJobConfig() 
job_config.query_parameters = query_params  
client = bigquery.Client() 
query_job = client.query(query, job_config=job_config) 
result = query_job.result()

如何将结果写入Google云端存储,而不是将其写入CSV并将其上传到云存储桶?

python google-cloud-platform google-bigquery google-cloud-storage
3个回答
5
投票

BigQuery不支持将其查询结果直接写入GCS。您必须将结果写入表中,然后在表格实现后将表导出到GCS。您可以使用Cloud Composer为您编排此功能。

或者,您可以使用Dataflow管道一次性实现所需的结果。但这是一项更多的工作,将花费更多的钱。我们的想法是使用您的SQL查询编写一个从BigQuery读取的管道,然后将结果写入GCS。它也会慢一些。


4
投票

根据您的具体使用案例(出口频率,出口规模等),@ GrahamPolley在答案中提出的解决方案可能对您有用,尽管它们需要更多的开发和关注。

目前writing query results的可能性是将结果写入表格或在本地下载,甚至直接下载到CSV有一些limitations。因此,无法直接将查询结果以CSV格式写入GCS。但是,有两个步骤的解决方案包括:

  1. Write query results to a BQ table
  2. Export data from a BQ table to a CSV file in GCS。请注意,此功能也有一些limitations,但它们并不那么窄。

以下Python代码可以让您了解如何执行该任务:

from google.cloud import bigquery
client = bigquery.Client()

# Write query results to a new table
job_config = bigquery.QueryJobConfig()
table_ref = client.dataset("DATASET").table("TABLE")
job_config.destination = table_ref
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

query_job = client.query(
    'SELECT name FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10',
    location='US', # Location must match dataset
    job_config=job_config)
rows = list(query_job)  # Waits for the query to finish


# Export table to GCS
destination_uri = "gs://BUCKET/FILE.CSV"
dataset_ref = client.dataset("DATASET", project="PROJECT_ID")
table_ref = dataset_ref.table("TABLE")

extract_job = client.extract_table(
    table_ref,
    destination_uri,
    location='US')
extract_job.result()  # Waits for job to complete

请注意,之后,您必须删除该表(您也可以以编程方式执行此操作)。如果你必须自动化这个过程,这可能不是最好的解决方案(如果这是你的用例,也许你应该更好地探索@ Graham的解决方案),但它可以解决一个简单的场景。


0
投票

@dsesto的回答对我来说非常有用。我使用他的代码并添加了一些额外的行来查询BigQuery,将结果写入表,然后导出到GCS并将结果导入Dask DataFrame。代码被包装到一个函数中。

def df_from_bq(query:str,table=None,compute=False):

from time import gmtime, strftime
from google.cloud import bigquery#y, storage 
import dask.dataframe as dd
import gcsfs

client = bigquery.Client.from_service_account_json('YOUR_PATH') #Authentication if BQ using ServiceKey
project = 'YOUR_PROJECT'

table_name = 'result_'+str(strftime("%Y%m%d_%H%M%S", gmtime())) if table==None else table #Creates custome table name if no name is defined

job_config = bigquery.QueryJobConfig()
table_ref = client.dataset("YOUR_DATASET").table(table_name)
job_config.destination = table_ref
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE #Creates the table with query result. Overwrites it if the table exists

query_job = client.query(
    query,
    location='US', 
    job_config=job_config)
query_job.result() 
print('Query results loaded to table {}'.format(table_ref.path))

destination_uri = "gs://YOUR_BUCKET/{}".format(table_name+'_*'+'.csv') 
dataset_ref = client.dataset("YOUR_DATASET", project=project)
table_ref = dataset_ref.table(table_name)

extract_job = client.extract_table(
    table_ref,
    destination_uri,
    location='US') 
extract_job.result() #Extracts results to the GCS

print('Query results extracted to GCS: {}'.format(destination_uri))

client.delete_table(table_ref) #Deletes table in BQ

print('Table {} deleted'.format(table_name))

gcs = gcsfs.GCSFileSystem(project=project, token='cache') 
df = dd.read_csv('gcs://YOUR_BUCKET/{}'.format(table_name+'_*'+'.csv'),  storage_options={'token': gcs.session.credentials})

#storage_client = storage.Client.from_service_account_json('C:\\Users\o.korshun\Documents\o.korshun.json')
#bucket = storage_client.get_bucket('plarium-analytics')
#blob = bucket.blob(table_name+'.csv')
#blob.delete() #Uncomment if you need to delete Blob after the DataFrame is created

#print('Blob {} deleted'.format(table_name+'.csv'))
print('Results imported to DD!')

return df if compute == False else df.compute().reset_index(in_place=True)

请注意,将结果导入云存储后,将删除BQ中的表。

© www.soinside.com 2019 - 2024. All rights reserved.