我尝试在 BigQuery 中将数据导出为 CSV,默认情况下它会生成以 LF 作为行终止符的导出文件,
export data options(uri = 'gs://<bucket>/<file_name>.txt', format = 'CSV',overwrite = true, header = false, field_delimiter = '~') as select * from <table> order by <col>
是否可以通过在 BigQuery 中的导出数据中提供一些参数来导出以 CRLF 作为行终止的数据?欢迎分享你的想法
编辑:要求是通过 AIRFLOW 达到预期结果,如果有任何直接方法通过添加任何 BQ EXPORT 参数来实现这一点,那就太好了。如果没有,在 Airflow 中使用 PYTHON 运算符或 BASH 运算符会很有帮助
有两种方法,我们可以缓解这个问题..因为我正在寻找在 Airflow 中处理它的解决方案
解决方案一 -> Python 运算符与 Bash 命令
path='gs://file_path'
file_name='file_name'
retrieve_files_command = f'gsutil ls {path}{file_name}*.txt'
process = Popen(retrieve_files_command, stdout=PIPE, shell=True)
#retrieve list of file that need to remove LF to CRLF
files, _ = process.communicate()
files = files.decode().split('\n')[:-1]
#loop through each file to remove LF with CRLF and create a new GCS file
for file in files:
filename = file.split('/')[-1]
dt=datetime.now().strftime("%Y%m%d%H%M%S")
modify_files_command = f"gsutil cat {file} | sed -e 's/$/\\r/' | gsutil cp - gs://<destination_path>/file_name_{dt}.txt"
try:
subprocess.run(modify_files_command,shell=True, check=True)
except subprocess.CalledProcessError as e:
print("{file} is not converted due to {e.stderr.decode('utf-8')}")
解决方案二 -> 具有缓冲区大小的 Python 运算符..注意:像解决方案一一样检索文件列表
buf_size = 4096
for input_file in files:
in_blob = bucket.blob(input_file)
op_prefix_name='destination_path'
dt=datetime.now().strftime("%Y%m%d%H%M%S%f")
op_fn='file_name' + dt
print("input is " + input_file)
print("otp is " + op_prefix_name + op_fn)
op_blob=bucket.blob(op_prefix_name + op_fn)
with in_blob.open('r') as in_file, op_blob.open('wt') as opt_file:
while True:
chunk = in_file.read(buf_size)
if not chunk:
break
opt_file.write(chunk.replace('\n','\r\n'))
就我而言,解决方案二适合我的环境,因为由于文件大小限制,Astro 可能会因 gsutil cat 命令而受到攻击,而解决方案二由于缓冲区大小而具有高性能,请根据需要调整缓冲区大小