在 Python 中将 MySQL 查询结果复制到临时文件

问题描述 投票:0回答:4

我对 SQL 世界有点陌生,但我正在学习一个名为 Optimizing pandas.read_sql for Postgres 的教程。问题是,我正在处理一个大数据集,类似于教程中的示例,我需要一种更快的方法来执行我的查询并将其转换为 DataFrame。在那里,他们使用这个功能:

def read_sql_tmpfile(query, db_engine):
    with tempfile.TemporaryFile() as tmpfile:
        copy_sql = "COPY ({query}) TO STDOUT WITH CSV {head}".format(
           query=query, head="HEADER"
        )
        conn = db_engine.raw_connection()
        cur = conn.cursor()
        cur.copy_expert(copy_sql, tmpfile)  # I want to replicate this
        tmpfile.seek(0)
        df = pandas.read_csv(tmpfile)
        return df

我试着复制它,像这样:

def read_sql_tmpfile(query, connection):
    with tempfile.TemporaryFile() as tmpfile:
        copy_sql = "COPY ({query}) TO STDOUT WITH CSV {head}".format(
           query=query, head="HEADER"
        )

        cur = connection.cursor()
        cur.copy_expert(copy_sql, tmpfile)
        tmpfile.seek(0)
        df = pandas.read_csv(tmpfile)
        return df

问题是,

cursor.copy_expert
来自 PostgreSQL 的
psycopg2
库,我找不到用
pymysql
做同样事情的方法。有什么办法吗?我应该怎么办?谢谢

python mysql postgresql temporary-files
4个回答
2
投票

假设尼克的问题是

如何从 MySQL 表在客户端创建 CSV 文件?

在命令行提示符下执行

mysql -u ... -p -h ... dbname -e '...' >localfile.csv

可执行语句是这样的

SELECT  col1, col2, col3, col4
    FROM mytable

备注:

  • 窗户:
    cmd
    ; *nix:一些“终端”应用程序。
  • 这是在客户端运行的。
  • dbname
    具有“使用 dbname;”的效果。
  • 用户、密码和(服务器的)主机名都可以填写。
  • 这假设“制表符”是 CSV 输出的合适分隔符。
  • 注意引号的嵌套(必要时转义)。
  • 列出您想要的任何列/表达式
  • A
    WHERE
    (etc) 可以根据需要包括在内。
  • 不需要FTP。
  • 不需要 Python。
  • SHOW ...
    的行为非常像
    SELECT
    .
  • 在 *nix 上,“制表符”可以变成另一个分隔符。
  • 可以使用
    mysql
    选项跳过标题行。

示例(不显示 -u -p -h):

# mysql  -e "show variables like 'max%size'" | tr '\t' ','
Variable_name,Value
max_binlog_cache_size,18446744073709547520
max_binlog_size,104857600
max_binlog_stmt_cache_size,18446744073709547520
max_heap_table_size,16777216
max_join_size,18446744073709551615
max_relay_log_size,0

2
投票

我知道,问题基本上由 waynetech 的评论回答。但我很感兴趣,细节和含义并不总是很明显,所以这里是经过测试的、可复制粘贴的解决方案。

由于输出文件最终在数据库服务器上,解决方案涉及处理服务器上的临时目录并将文件传输到客户端。为了简单起见,我为此使用了 SSH 和 SFTP。这假设两台机器的 SSH 密钥已经预先交换。通过涉及 samba 共享或类似的东西,远程文件传输和处理可能会更容易。

@Nick ODell:请给这个解决方案一个机会,做一个基准测试!我很确定复制开销对于大量数据来说并不重要。

def read_sql_tmpfile(query, connection):
    df = None

    # Create unique temp directory on server side
    cmd = "mktemp -d"
    (out_mktemp, err) = subprocess.Popen(f'ssh {username}@{db_server} "{cmd}"', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
    if err or not out_mktemp:
        return

    # remove additional white spaces around the output
    tmp_dir = out_mktemp.decode().strip()

    # The following command should be made superfluous by tweaking the group memberships 
    # to grant `mysql` user full access to the directory created by the user which executes the `mktemp` command
    cmd = f"chmod 777 -R {tmp_dir}"
    res = os.system(f'ssh {username}@{db_server} "{cmd}"')
    if res:
        return

    try:
        remote_tmp_file = f'{tmp_dir}/sql_tmpfile'

        # remember: db-connection's user need `FILE` privilege
        # think about sql injection, pass MySql parameters in query and corresponding parameters list to this function if appropriate
        copy_sql = f"{query} INTO OUTFILE '{remote_tmp_file}'"

        cur = connection.cursor()
        cur.execute(copy_sql)

        local_tmp_file = os.path.basename(remote_tmp_file)
        cmd = f"sftp {username}@{db_server}:{remote_tmp_file} {local_tmp_file}"
        res = os.system(cmd)
        if not res and os.path.isfile(local_tmp_file):
            try:
                df = pandas.read_csv(local_tmp_file)
            finally:
                # cleanup local temp file
                os.remove(local_tmp_file)
    finally:
        # cleanup remote temp dir
        cmd = f"rm -R {tmp_dir}"
        os.system(f'ssh {username}@{db_server} "{cmd}"')

    return df

0
投票

如评论中所述,在this answer中,您正在寻找

SELECT ... INTO OUTFILE
.

这是一个小的(未经测试的)例子,基于你的问题:

def read_sql_tmpfile(query, connection):
    # Create tmp file name without creating the file
    tmp_dir = tempfile.mkdtemp()
    tmp_file_name = os.path.join(tmp_dir, next(tempfile._get_candidate_names()))
    
    # Copy data into temporary file
    copy_sql = "{query} INTO OUTFILE {outfile}".format(
           query=query, outfile=tmp_file_name 
    )
    cur = connection.cursor()
    cur.execute(copy_sql)
    
    # Read data from file
    df = pandas.read_csv(tmp_file_name)
    # Cleanup
    os.remove(tmp_file_name)
    return df

0
投票

为了找出这些答案中哪一个最快,我在一个合成数据集上对每个答案进行了基准测试。该数据集包含 100MB 的时间序列数据和 500MB 的文本数据。 (注意:这是使用 Pandas 测量的,与可以在 NumPy 中表示的数据相比,它会严重惩罚小对象。)

我对 5 种方法进行了基准测试:

  • naive
    read_sql()
    .
  • 的baseline
  • sftp:加载到 OUTFILE,然后调用 sftp 和 read_csv。
  • tofile:用-B调用mysql命令生成CSV文件,写入文件。
  • pipe:使用 -B 调用 mysql 命令以生成 CSV,并使用 read_csv 从该管道读取。还可以使用 fnctl() 来增加管道尺寸。
  • pipe_no_fcntl:与之前相同,但没有 fcntl。

时间

所有方法都试了七次,顺序随机。在下表中,分数越低越好。

时间序列基准:

方法 时间(秒) 标准误 (s)
管道 6.719870 0.064610
pipe_no_fcntl 7.243937 0.104802
归档 7.636196 0.125963
sftp 9.926580 0.171262
天真 11.125657 0.470146

文本基准:

方法 时间(秒) 标准误 (s)
管道 8.452694 0.217661
归档 9.502743 0.265003
pipe_no_fcntl 9.620349 0.420255
sftp 12.189046 0.294148
天真 13.769322 0.695961

制胜法宝

这是管道方法,最快。

import os
import pandas as pd
import subprocess
import tempfile
import time
import fcntl


db_server = '...'
F_SETPIPE_SZ = 1031


def read_sql_pipe(query, database):
    args = ['mysql', f'--login-path={db_server}', database, '-B', '-e', query]
    try:
        # Run mysql and capture output
        proc = subprocess.Popen(args, stdout=subprocess.PIPE)
    except FileNotFoundError:
        # MySQL is not installed. Raise a better error message.
        raise Exception("The mysql command is not installed. Use brew or apt to install it.") from None

    # Raise amount of CSV data buffered up to 1MB.
    # This is a Linux-only syscall.
    fcntl.fcntl(proc.stdout.fileno(), F_SETPIPE_SZ, 1 << 20)

    df = pd.read_csv(proc.stdout, delimiter='\t')

    retcode = proc.wait()
    if retcode != 0:
        raise subprocess.CalledProcessError(
            retcode, proc.args, output=proc.stdout, stderr=proc.stderr
        )

    return df

基本思想是使用 subprocess 模块调用 mysql,MySQL 的标准输出被馈送到管道。管道是一个类似文件的对象,可以直接传递给

pd.read_csv()
。 MySQL 进程在 Pandas 读取 CSV 的同时创建 CSV,因此这比在 Pandas 开始读取之前写入整个文件的方法更具优势。

关于 fcntl 的注意事项:fcntl 在这里很有用,因为默认情况下可以在管道中缓冲的数据量限制为 64kB。我发现将其提高到 1MB 会导致大约 10% 的加速。如果这不可用,将 CSV 写入文件的解决方案可能优于管道方法。

数据集

数据集是使用以下脚本生成的。

import pandas as pd
import numpy as np
from english_words import get_english_words_set
np.random.seed(42)

import util


def gen_benchmark_df(data_function, limit):
    i = 0
    df = data_function(i)
    i += 1
    while df.memory_usage(deep=True).sum() < limit:
        df = pd.concat([df, data_function(i)], ignore_index=True)
        i += 1
    # Trim excess rows
    row_count = len(df.index)
    data_size_bytes = df.memory_usage(deep=True).sum()
    row_count_needed = int(row_count * (limit / data_size_bytes))
    df = df.head(row_count_needed)
    return df


def gen_ts_chunk(i):
    rows = 100_000
    return pd.DataFrame({
        'run_id': np.random.randint(1, 1_000_000),
        'feature_id': np.random.randint(1, 1_000_000),
        'timestep': np.arange(0, rows),
        'val': np.cumsum(np.random.uniform(-1, 1, rows))
    })


def gen_text_chunk(i):
    rows = 10_000
    words = list(get_english_words_set(['web2'], lower=True))
    text_strings = np.apply_along_axis(lambda x: ' '.join(x), axis=1, arr=np.random.choice(words, size=(rows, 3)))
    return pd.DataFrame({
        'id': np.arange(i * rows, (i + 1) * rows),
        'data': text_strings
    })



dataset_size = 1e8


con = util.open_engine()
timeseries_df = gen_benchmark_df(gen_ts_chunk, dataset_size)
timeseries_df.to_sql('timeseries', con=con, if_exists='replace', index=False, chunksize=10_000)


dataset_size = 5e8

text_df = gen_benchmark_df(gen_text_chunk, dataset_size)
text_df.to_sql('text', con=con, if_exists='replace', index=False, chunksize=10_000)
© www.soinside.com 2019 - 2024. All rights reserved.