我正在尝试编写一个 parquet 文件,其中包含一个日期列,其 parquet 中的逻辑类型为 DATE,物理类型为 INT32。我正在使用 pandas 并使用 fastparquet 作为引擎编写 parquet 文件,因为我需要从数据库流式传输数据并附加到同一个 parquet 文件。这是我的代码
import os
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy import text
sql = "SELECT TO_VARCHAR(CURRENT_DATE, 'YYYYMMDD') AS "REPORT_DATE" FROM DUMMY;"
def create_stream_enabled_connection(username, password, host, port):
conn_str = f"mydb://{username}:{password}@{host}:{port}"
engine = create_engine(conn_str, connect_args={'encrypt': 'True', 'sslValidateCertificate':'False'})
connection = engine.connect().execution_options(stream_results=True)
return connection
connection = create_stream_enabled_connection(username, password, host, port)
ROWS_IN_CHUNK = 500000
# Stream chunks from database
for dataframe_chunk in pd.read_sql(text(sql), connection, chunksize=ROWS_IN_CHUNK):
if os.stat(local_path).st_size == 0: # If file is empty
# write parquet file
dataframe_chunk.to_parquet(local_path, index=False, engine='fastparquet')
else:
# write parquet file
dataframe_chunk.to_parquet(local_path, index=False, engine='fastparquet', append=True)
问题:
我无法使用
to_parquet
作为引擎,在 pandas fastparquet
函数的输出镶木地板中获得逻辑类型为 DATE 和物理类型为 INT32。
我尝试过的一些事情:
datetime64[ns]
,镶木地板中的逻辑类型为 Timestamp(isAdjustedToUTC=false, timeUnit=nanoseconds, is_from_converted_type=false, force_set_converted_type=false)
,物理类型为 INT64
。对我不起作用。dataframe_chunk['report_date'] = pd.to_datetime(dataframe_chunk['report_date'], format='%Y%m%d')
我需要 parquet 逻辑类型为
DATE
,物理类型为 INT32
,因为该 parquet 将直接加载到 bigquery,并且 REPORT_DATE 列将转到 bigquery 中的 DATE
类型列。请参阅此处的 bigquery 文档:https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet#type_conversions
任何帮助将不胜感激。
已回答https://github.com/dask/fastparquet/issues/880#issuecomment-1697436417
简短的故事是:fastparquet 目前无法做到这一点。我不知道如何说服 bigquery 接受当前的输出样式,但我认为一定有办法,因为 64 位格式是有效的 parquet。
Pyarrow 不支持附加到文件,但将列转换为 date 对象 并使用 pyarrow 引擎,我相信会写出具有正确逻辑类型的列。
根据您对流式传输/附加的具体需求,您也许可以保持文件打开并批量附加行,直到准备好上传为止。或者,您可以将一系列文件写入对象存储,然后在单个加载作业中将它们加载到 BQ 中。