在Python中使用并行处理进行数据迁移

问题描述 投票:0回答:1

我在本地环境中有一个Python脚本,它从sql server中提取数据,将其加载到pandas数据帧中,然后使用大查询客户端将数据帧写入bigquery。

问题是有问题的数据,它有 19 亿条记录,我正在考虑在 python 中使用并行处理来加快这个过程。

我尝试对数据加载部分进行分块,其中一个块有 100 万条记录,但仍然需要很多时间才能完成此操作。

有什么办法可以解决这个问题吗?或者还有其他方法可以有效迁移这些数据吗?

下面是我正在使用的脚本:

import pandas as pd
import pyodbc 
from google.cloud import bigquery

# Connection string
conn_str = f'DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={password}'
conn = pyodbc.connect(conn_str)

# Initial query to get the first batch
sql_query = f"SELECT TOP 1000000 * FROM {table_name} ORDER BY id"
last_id = 0

job_config = bigquery.LoadJobConfig(
    autodetect=True,
    write_disposition = "WRITE_APPEND",
)

while True:
    df = pd.read_sql(sql_query, conn)
    
    if df.empty:
        break
    
    # Write the DataFrame to BigQuery
    job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
    job.result()  # Wait for the job to complete
    
    # Update the last_id to the last seen in the DataFrame
    last_id = df['id'].iloc[-1]
    
    # Prepare the next query
    sql_query = f"SELECT TOP 1000000 * FROM {table_name} WHERE id> {last_id} ORDER BY id"

    print(f"Loaded up to ID: {last_id}")

# Close the database connection
conn.close()
python sql-server google-bigquery parallel-processing database-migration
1个回答
0
投票

我可能不建议使用 python 脚本作为这种规模的数据迁移的正确工具。相反,我会考虑参考谷歌的数据流文档并使用某种批处理管道将您的 sql server 实例连接到 bigquery,如文档中所示

© www.soinside.com 2019 - 2024. All rights reserved.