Boto 3 Redshift 客户端执行语句滞后

问题描述 投票:0回答:1

我正在尝试将一些数据从 MySQL 复制到 RS。该表有 650,000 行左右,所以我以 10,000 左右的批次进行处理,并且还使用 python 的多处理模块。前几次迭代结果还不错。该脚本提取 10,000 行并在 S3 中创建一个 csv 文件,然后将数据从该文件复制到 redshift 表。但很快脚本就开始跳过块。它仍然将批处理文件上传到 S3,但不会复制到主表。最后一次运行花费了 34 分钟,下面是实际复制到表中的记录 ID(应该从 1 到 650,000)的图表。您可以看到一大堆记录没有进入主表的跳转。

有人对此有任何解决方案吗?我目前正在本地运行它,但是当我将 docker 映像上传到 AWS 并从那里运行它时,也会发生同样的情况。

from psycopg2.extensions import register_adapter
from psycopg2.extras import Json
import MySQLdb
import json
import boto3
from io import StringIO
import botocore.session as bc
from botocore.client import Config
import multiprocessing as mp

db = 'db'
cluster_id = 'cluster'
bucket = 'bucket'
rs_secret = 'rs_secret'

session = boto3.Session(
    botocore_session=bc.get_session(),
    profile_name='profile',
    region_name='us-west-2'
)

secret_arn = session.client('secretsmanager').get_secret_value(SecretId=rs_secret)['ARN']
config = Config(connect_timeout=180, read_timeout=180)
client_redshift = session.client("redshift-data", config=config)

# ------------------------------------------------------------------------------------

def get_secret(secret_id, session):
    region_name = "us-west-2"
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )
    get_secret_value_response = client.get_secret_value(SecretId=secret_id)
    secret = get_secret_value_response['SecretString']
    return json.loads(secret)


def upload_to_S3(data, key, session):
    s3 = session.resource('s3')
    csv_buffer = StringIO()
    data.to_csv(csv_buffer, index=False)
    s3.Object(bucket, key).put(Body=csv_buffer.getvalue())


def write_to_RS(query):
    try:
        result = client_redshift.execute_statement(Database=db,
                                                   SecretArn=secret_arn,
                                                   Sql=query,
                                                   ClusterIdentifier=cluster_id)
    except Exception as e:
        raise Exception(e)

# ------------------------------------------------------------------------------------

table_name = 'form'
col = ['settings', 'fields', 'design', 'confirmation']
types = ['super', 'super', 'super', 'super']

table_name_full = f'{db}.{table_name}'

# ------------------------------------------------------------------------------------

mysql_secret = get_secret('secret', session)

register_adapter(list, Json)

mydb = MySQLdb.connect(
        host = mysql_secret['host'],
        user = mysql_secret['username'],
        password = mysql_secret['password'],
        database = 'db'
)

mycursor = mydb.cursor()

# ------------------------------------------------------------------------------------

iam_role = 'iam_role'

chunk = 10000
chunks = range(65)

def task(c):
    q = f"""
    select
        id, {', '.join(col)}
    from {table_name}
    where not ({' is null and '.join(col)} is null)
        and not ({' = "[]" and '.join(col)} = "[]")
    limit {chunk}
    offset {chunk * (c)}
    INTO OUTFILE S3 's3://{bucket}/raw/{table_name}_{c + 1}.csv'
    FORMAT CSV HEADER
    OVERWRITE ON;
    """

    mycursor.execute(q)

    key = f'raw/{table_name}_{c + 1}.csv.part_00000'

    query = f"""
        COPY {table_name_full}
        FROM 's3://{bucket}/{key}'
        iam_role '{iam_role}'
        ignoreheader 1
        CSV;
    """

    write_to_RS(query)

if __name__ == "__main__":  
    pool = mp.Pool(mp.cpu_count())
    pool.map(task, chunks)
    pool.close()
mysql parallel-processing amazon-redshift boto3 batch-processing
1个回答
0
投票

有很多地方可能会出错。 S3 文件名冲突、S3 操作乱序、S3 对象在添加数据之前打开...

首先要做的是了解所有数据是否最终都在 S3 中。您所拥有的进程应在运行后将文件保留在 S3 中。如果您将 S3 中的所有文件的单个副本运行到新表中,所有记录都存在吗?

如果不是,您正在调试代码以了解原因。

如果所有记录都在那里,那么您可能在数据写入完成之前发出 COPY 时遇到问题,或者 RS COPY 命令失败。无论哪种方式,您都应该能够检查 COPY 命令响应的消息,并查看您收到的错误或加载的行数。

我怀疑您可能超出了 Redshift 的最大连接(会话)计数,这是整个集群 500 的硬限制。

您可能还会看到 WLM 超限的问题,但这个问题应该会随着时间的推移而消失。但是,可能会出现超时导致某些 COPY 中止。

顺便说一句,您使用 RS 的方式非常低效。对于 RS 来说,一个文件中的 10,000 行相当小,并且发出单个文件 COPY 命令也并不理想。您最好先在 S3 中构建文件,然后发出单个 COPY 命令。这也很有可能解决根本问题。如果您可以将 S3 文件大小增加到至少 100MB(1GB 是理想的),这将对您的运行时间产生积极影响。

© www.soinside.com 2019 - 2024. All rights reserved.