[CSV至.GZ,使用Cloud函数,Python

问题描述 投票:0回答:2

我一直在尝试使用Cloud Function-Python 3.7将CSV文件压缩为.gz,然后再上传到GCS,但是我的代码只添加了.gz扩展名,但并没有真正压缩文件,所以最后,文件已损坏。您能告诉我如何解决此问题吗?谢谢

这是我的代码的一部分

import gzip


def to_gcs(request):    
    job_config = bigquery.QueryJobConfig()
    gcs_filename = 'filename_{}.csv'
    bucket_name = 'bucket_gcs_name'
    subfolder = 'subfolder_name'
    client = bigquery.Client()


    job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

    QUERY = "SELECT * FROM `bigquery-public-data.google_analytics_sample.ga_sessions_*` session, UNNEST(hits) AS hits"
    query_job = client.query(
        QUERY,
        location='US',
        job_config=job_config)

    while not query_job.done():
        time.sleep(1)

    rows_df = query_job.result().to_dataframe()
    storage_client = storage.Client()

    storage_client.get_bucket(bucket_name).blob(subfolder+'/'+gcs_filename+'.gz').upload_from_string(rows_df.to_csv(sep='|',index=False,encoding='utf-8',compression='gzip'), content_type='application/octet-stream')

python google-cloud-functions google-cloud-storage
2个回答
1
投票

正如@Sam Mason在评论中所引用的thread中所建议的那样,一旦获得Pandas数据名人堂,就应使用以下示例中所述的TextIOWrapper()BytesIO()

以下示例受this SO线程中@ramhiser的回答启发

df = query_job.result().to_dataframe()
blob = bucket.blob(f'{subfolder}/{gcs_filename}.gz')

with BytesIO() as gz_buffer:
    with gzip.GzipFile(mode='w', fileobj=gz_buffer) as gz_file:
        df.to_csv(TextIOWrapper(gz_file, 'utf8'), index=False)

    blob.upload_from_file(gz_buffer,
        content_type='application/octet-stream')

还请注意,如果您希望此文件变得大于几个MB,则最好使用tempfile module中的内容代替tempfileBytesIO基本上是为此用例设计的,它将使用最大给定大小的内存缓冲区,并且仅在文件很大时才使用磁盘。


0
投票

嗨,我试图重现您的用例:

  1. 我使用此快速入门SpooledTemporaryFile创建了一个云函数:

    SpooledTemporaryFile
    1. 这是link

      def hello_world(request):
      
        from google.cloud import bigquery
        from google.cloud import storage 
        import pandas as pd 
      
      
        client = bigquery.Client() 
        storage_client = storage.Client() 
      
        path = '/tmp/file.gz'
      
      
        query_job = client.query("""
        SELECT
        CONCAT(
          'https://stackoverflow.com/questions/',
           CAST(id as STRING)) as url,
        view_count
        FROM `bigquery-public-data.stackoverflow.posts_questions`
        WHERE tags like '%google-bigquery%'
        ORDER BY view_count DESC
        LIMIT 10""")  
      
        results = query_job.result().to_dataframe()
        results.to_csv(path,sep='|',index=False,encoding='utf-8',compression='gzip')
      
        bucket = storage_client.get_bucket('mybucket')  
        blob = bucket.blob('file.gz')
        blob.upload_from_filename(path)
      
    2. 我部署了该功能。

    3. 我检查了输出。

      requirements.txt
© www.soinside.com 2019 - 2024. All rights reserved.