我一直在尝试使用Cloud Function-Python 3.7将CSV文件压缩为.gz,然后再上传到GCS,但是我的代码只添加了.gz扩展名,但并没有真正压缩文件,所以最后,文件已损坏。您能告诉我如何解决此问题吗?谢谢
这是我的代码的一部分
import gzip
def to_gcs(request):
job_config = bigquery.QueryJobConfig()
gcs_filename = 'filename_{}.csv'
bucket_name = 'bucket_gcs_name'
subfolder = 'subfolder_name'
client = bigquery.Client()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
QUERY = "SELECT * FROM `bigquery-public-data.google_analytics_sample.ga_sessions_*` session, UNNEST(hits) AS hits"
query_job = client.query(
QUERY,
location='US',
job_config=job_config)
while not query_job.done():
time.sleep(1)
rows_df = query_job.result().to_dataframe()
storage_client = storage.Client()
storage_client.get_bucket(bucket_name).blob(subfolder+'/'+gcs_filename+'.gz').upload_from_string(rows_df.to_csv(sep='|',index=False,encoding='utf-8',compression='gzip'), content_type='application/octet-stream')
正如@Sam Mason在评论中所引用的thread中所建议的那样,一旦获得Pandas数据名人堂,就应使用以下示例中所述的TextIOWrapper()
和BytesIO()
:
以下示例受this SO线程中@ramhiser的回答启发
df = query_job.result().to_dataframe()
blob = bucket.blob(f'{subfolder}/{gcs_filename}.gz')
with BytesIO() as gz_buffer:
with gzip.GzipFile(mode='w', fileobj=gz_buffer) as gz_file:
df.to_csv(TextIOWrapper(gz_file, 'utf8'), index=False)
blob.upload_from_file(gz_buffer,
content_type='application/octet-stream')
还请注意,如果您希望此文件变得大于几个MB,则最好使用tempfile
module中的内容代替tempfile
。 BytesIO
基本上是为此用例设计的,它将使用最大给定大小的内存缓冲区,并且仅在文件很大时才使用磁盘。
嗨,我试图重现您的用例:
我使用此快速入门SpooledTemporaryFile
创建了一个云函数:
SpooledTemporaryFile
这是link:
def hello_world(request):
from google.cloud import bigquery
from google.cloud import storage
import pandas as pd
client = bigquery.Client()
storage_client = storage.Client()
path = '/tmp/file.gz'
query_job = client.query("""
SELECT
CONCAT(
'https://stackoverflow.com/questions/',
CAST(id as STRING)) as url,
view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
LIMIT 10""")
results = query_job.result().to_dataframe()
results.to_csv(path,sep='|',index=False,encoding='utf-8',compression='gzip')
bucket = storage_client.get_bucket('mybucket')
blob = bucket.blob('file.gz')
blob.upload_from_filename(path)
我部署了该功能。
我检查了输出。
requirements.txt