from google.cloud import bigquery
query = """ select * from emp where emp_name=@emp_name"""
query_params = [bigquery.ScalarQueryParameter('emp_name', 'STRING', 'name')]
job_config = bigquery.QueryJobConfig()
job_config.query_parameters = query_params
client = bigquery.Client()
query_job = client.query(query, job_config=job_config)
result = query_job.result()
如何将结果写入Google云端存储,而不是将其写入CSV并将其上传到云存储桶?
BigQuery不支持将其查询结果直接写入GCS。您必须将结果写入表中,然后在表格实现后将表导出到GCS。您可以使用Cloud Composer为您编排此功能。
或者,您可以使用Dataflow管道一次性实现所需的结果。但这是一项更多的工作,将花费更多的钱。我们的想法是使用您的SQL查询编写一个从BigQuery读取的管道,然后将结果写入GCS。它也会慢一些。
根据您的具体使用案例(出口频率,出口规模等),@ GrahamPolley在答案中提出的解决方案可能对您有用,尽管它们需要更多的开发和关注。
目前writing query results的可能性是将结果写入表格或在本地下载,甚至直接下载到CSV有一些limitations。因此,无法直接将查询结果以CSV格式写入GCS。但是,有两个步骤的解决方案包括:
以下Python代码可以让您了解如何执行该任务:
from google.cloud import bigquery
client = bigquery.Client()
# Write query results to a new table
job_config = bigquery.QueryJobConfig()
table_ref = client.dataset("DATASET").table("TABLE")
job_config.destination = table_ref
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
query_job = client.query(
'SELECT name FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10',
location='US', # Location must match dataset
job_config=job_config)
rows = list(query_job) # Waits for the query to finish
# Export table to GCS
destination_uri = "gs://BUCKET/FILE.CSV"
dataset_ref = client.dataset("DATASET", project="PROJECT_ID")
table_ref = dataset_ref.table("TABLE")
extract_job = client.extract_table(
table_ref,
destination_uri,
location='US')
extract_job.result() # Waits for job to complete
请注意,之后,您必须删除该表(您也可以以编程方式执行此操作)。如果你必须自动化这个过程,这可能不是最好的解决方案(如果这是你的用例,也许你应该更好地探索@ Graham的解决方案),但它可以解决一个简单的场景。
@dsesto的回答对我来说非常有用。我使用他的代码并添加了一些额外的行来查询BigQuery,将结果写入表,然后导出到GCS并将结果导入Dask DataFrame。代码被包装到一个函数中。
def df_from_bq(query:str,table=None,compute=False):
from time import gmtime, strftime
from google.cloud import bigquery#y, storage
import dask.dataframe as dd
import gcsfs
client = bigquery.Client.from_service_account_json('YOUR_PATH') #Authentication if BQ using ServiceKey
project = 'YOUR_PROJECT'
table_name = 'result_'+str(strftime("%Y%m%d_%H%M%S", gmtime())) if table==None else table #Creates custome table name if no name is defined
job_config = bigquery.QueryJobConfig()
table_ref = client.dataset("YOUR_DATASET").table(table_name)
job_config.destination = table_ref
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE #Creates the table with query result. Overwrites it if the table exists
query_job = client.query(
query,
location='US',
job_config=job_config)
query_job.result()
print('Query results loaded to table {}'.format(table_ref.path))
destination_uri = "gs://YOUR_BUCKET/{}".format(table_name+'_*'+'.csv')
dataset_ref = client.dataset("YOUR_DATASET", project=project)
table_ref = dataset_ref.table(table_name)
extract_job = client.extract_table(
table_ref,
destination_uri,
location='US')
extract_job.result() #Extracts results to the GCS
print('Query results extracted to GCS: {}'.format(destination_uri))
client.delete_table(table_ref) #Deletes table in BQ
print('Table {} deleted'.format(table_name))
gcs = gcsfs.GCSFileSystem(project=project, token='cache')
df = dd.read_csv('gcs://YOUR_BUCKET/{}'.format(table_name+'_*'+'.csv'), storage_options={'token': gcs.session.credentials})
#storage_client = storage.Client.from_service_account_json('C:\\Users\o.korshun\Documents\o.korshun.json')
#bucket = storage_client.get_bucket('plarium-analytics')
#blob = bucket.blob(table_name+'.csv')
#blob.delete() #Uncomment if you need to delete Blob after the DataFrame is created
#print('Blob {} deleted'.format(table_name+'.csv'))
print('Results imported to DD!')
return df if compute == False else df.compute().reset_index(in_place=True)
请注意,将结果导入云存储后,将删除BQ中的表。