GCP:Bigquery 以 CRLF 作为行终止符导出数据

问题描述 投票:0回答:1

我尝试在 BigQuery 中将数据导出为 CSV,默认情况下它会生成以 LF 作为行终止符的导出文件,

export data options(uri = 'gs://<bucket>/<file_name>.txt', format = 'CSV',overwrite = true, header = false, field_delimiter = '~') as select * from <table> order by <col>

是否可以通过在 BigQuery 中的导出数据中提供一些参数来导出以 CRLF 作为行终止的数据?欢迎分享你的想法

编辑:要求是通过 AIRFLOW 达到预期结果,如果有任何直接方法通过添加任何 BQ EXPORT 参数来实现这一点,那就太好了。如果没有,在 Airflow 中使用 PYTHON 运算符或 BASH 运算符会很有帮助

python google-cloud-platform unix google-bigquery airflow
1个回答
0
投票

有两种方法,我们可以缓解这个问题..因为我正在寻找在 Airflow 中处理它的解决方案

解决方案一 -> Python 运算符与 Bash 命令

path='gs://file_path'
file_name='file_name'
retrieve_files_command = f'gsutil ls {path}{file_name}*.txt'
process = Popen(retrieve_files_command, stdout=PIPE, shell=True)

#retrieve list of file that need to remove LF to CRLF
files, _ = process.communicate()

files = files.decode().split('\n')[:-1]

#loop through each file to remove LF with CRLF and create a new GCS file
for file in files:
    filename = file.split('/')[-1]
    dt=datetime.now().strftime("%Y%m%d%H%M%S")
    modify_files_command = f"gsutil cat {file} | sed -e 's/$/\\r/' | gsutil cp - gs://<destination_path>/file_name_{dt}.txt"
    
    try:
        subprocess.run(modify_files_command,shell=True, check=True)
    except subprocess.CalledProcessError as e:
        print("{file} is not converted due to {e.stderr.decode('utf-8')}")

解决方案二 -> 具有缓冲区大小的 Python 运算符..注意:像解决方案一一样检索文件列表

buf_size = 4096
for input_file in files:
    in_blob = bucket.blob(input_file)
    op_prefix_name='destination_path'
    dt=datetime.now().strftime("%Y%m%d%H%M%S%f")
    op_fn='file_name' + dt
    print("input is " + input_file)
    print("otp is " + op_prefix_name + op_fn)
    op_blob=bucket.blob(op_prefix_name + op_fn)
    with in_blob.open('r') as in_file, op_blob.open('wt') as opt_file:
      while True:
          chunk = in_file.read(buf_size)
          if not chunk:
              break
          opt_file.write(chunk.replace('\n','\r\n'))

就我而言,解决方案二适合我的环境,因为由于文件大小限制,Astro 可能会因 gsutil cat 命令而受到攻击,而解决方案二由于缓冲区大小而具有高性能,请根据需要调整缓冲区大小

© www.soinside.com 2019 - 2024. All rights reserved.