我对 SQL 世界有点陌生,但我正在学习一个名为 Optimizing pandas.read_sql for Postgres 的教程。问题是,我正在处理一个大数据集,类似于教程中的示例,我需要一种更快的方法来执行我的查询并将其转换为 DataFrame。在那里,他们使用这个功能:
def read_sql_tmpfile(query, db_engine):
with tempfile.TemporaryFile() as tmpfile:
copy_sql = "COPY ({query}) TO STDOUT WITH CSV {head}".format(
query=query, head="HEADER"
)
conn = db_engine.raw_connection()
cur = conn.cursor()
cur.copy_expert(copy_sql, tmpfile) # I want to replicate this
tmpfile.seek(0)
df = pandas.read_csv(tmpfile)
return df
我试着复制它,像这样:
def read_sql_tmpfile(query, connection):
with tempfile.TemporaryFile() as tmpfile:
copy_sql = "COPY ({query}) TO STDOUT WITH CSV {head}".format(
query=query, head="HEADER"
)
cur = connection.cursor()
cur.copy_expert(copy_sql, tmpfile)
tmpfile.seek(0)
df = pandas.read_csv(tmpfile)
return df
问题是,
cursor.copy_expert
来自 PostgreSQL 的 psycopg2
库,我找不到用 pymysql
做同样事情的方法。有什么办法吗?我应该怎么办?谢谢
假设尼克的问题是
如何从 MySQL 表在客户端创建 CSV 文件?
在命令行提示符下执行
mysql -u ... -p -h ... dbname -e '...' >localfile.csv
可执行语句是这样的
SELECT col1, col2, col3, col4
FROM mytable
备注:
cmd
; *nix:一些“终端”应用程序。dbname
具有“使用 dbname;”的效果。WHERE
(etc) 可以根据需要包括在内。SHOW ...
的行为非常像SELECT
.mysql
选项跳过标题行。示例(不显示 -u -p -h):
# mysql -e "show variables like 'max%size'" | tr '\t' ','
Variable_name,Value
max_binlog_cache_size,18446744073709547520
max_binlog_size,104857600
max_binlog_stmt_cache_size,18446744073709547520
max_heap_table_size,16777216
max_join_size,18446744073709551615
max_relay_log_size,0
我知道,问题基本上由 waynetech 的评论回答。但我很感兴趣,细节和含义并不总是很明显,所以这里是经过测试的、可复制粘贴的解决方案。
由于输出文件最终在数据库服务器上,解决方案涉及处理服务器上的临时目录并将文件传输到客户端。为了简单起见,我为此使用了 SSH 和 SFTP。这假设两台机器的 SSH 密钥已经预先交换。通过涉及 samba 共享或类似的东西,远程文件传输和处理可能会更容易。
@Nick ODell:请给这个解决方案一个机会,做一个基准测试!我很确定复制开销对于大量数据来说并不重要。
def read_sql_tmpfile(query, connection):
df = None
# Create unique temp directory on server side
cmd = "mktemp -d"
(out_mktemp, err) = subprocess.Popen(f'ssh {username}@{db_server} "{cmd}"', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
if err or not out_mktemp:
return
# remove additional white spaces around the output
tmp_dir = out_mktemp.decode().strip()
# The following command should be made superfluous by tweaking the group memberships
# to grant `mysql` user full access to the directory created by the user which executes the `mktemp` command
cmd = f"chmod 777 -R {tmp_dir}"
res = os.system(f'ssh {username}@{db_server} "{cmd}"')
if res:
return
try:
remote_tmp_file = f'{tmp_dir}/sql_tmpfile'
# remember: db-connection's user need `FILE` privilege
# think about sql injection, pass MySql parameters in query and corresponding parameters list to this function if appropriate
copy_sql = f"{query} INTO OUTFILE '{remote_tmp_file}'"
cur = connection.cursor()
cur.execute(copy_sql)
local_tmp_file = os.path.basename(remote_tmp_file)
cmd = f"sftp {username}@{db_server}:{remote_tmp_file} {local_tmp_file}"
res = os.system(cmd)
if not res and os.path.isfile(local_tmp_file):
try:
df = pandas.read_csv(local_tmp_file)
finally:
# cleanup local temp file
os.remove(local_tmp_file)
finally:
# cleanup remote temp dir
cmd = f"rm -R {tmp_dir}"
os.system(f'ssh {username}@{db_server} "{cmd}"')
return df
如评论中所述,在this answer中,您正在寻找
SELECT ... INTO OUTFILE
.
这是一个小的(未经测试的)例子,基于你的问题:
def read_sql_tmpfile(query, connection):
# Create tmp file name without creating the file
tmp_dir = tempfile.mkdtemp()
tmp_file_name = os.path.join(tmp_dir, next(tempfile._get_candidate_names()))
# Copy data into temporary file
copy_sql = "{query} INTO OUTFILE {outfile}".format(
query=query, outfile=tmp_file_name
)
cur = connection.cursor()
cur.execute(copy_sql)
# Read data from file
df = pandas.read_csv(tmp_file_name)
# Cleanup
os.remove(tmp_file_name)
return df
为了找出这些答案中哪一个最快,我在一个合成数据集上对每个答案进行了基准测试。该数据集包含 100MB 的时间序列数据和 500MB 的文本数据。 (注意:这是使用 Pandas 测量的,与可以在 NumPy 中表示的数据相比,它会严重惩罚小对象。)
我对 5 种方法进行了基准测试:
read_sql()
.所有方法都试了七次,顺序随机。在下表中,分数越低越好。
时间序列基准:
方法 | 时间(秒) | 标准误 (s) |
---|---|---|
管道 | 6.719870 | 0.064610 |
pipe_no_fcntl | 7.243937 | 0.104802 |
归档 | 7.636196 | 0.125963 |
sftp | 9.926580 | 0.171262 |
天真 | 11.125657 | 0.470146 |
文本基准:
方法 | 时间(秒) | 标准误 (s) |
---|---|---|
管道 | 8.452694 | 0.217661 |
归档 | 9.502743 | 0.265003 |
pipe_no_fcntl | 9.620349 | 0.420255 |
sftp | 12.189046 | 0.294148 |
天真 | 13.769322 | 0.695961 |
这是管道方法,最快。
import os
import pandas as pd
import subprocess
import tempfile
import time
import fcntl
db_server = '...'
F_SETPIPE_SZ = 1031
def read_sql_pipe(query, database):
args = ['mysql', f'--login-path={db_server}', database, '-B', '-e', query]
try:
# Run mysql and capture output
proc = subprocess.Popen(args, stdout=subprocess.PIPE)
except FileNotFoundError:
# MySQL is not installed. Raise a better error message.
raise Exception("The mysql command is not installed. Use brew or apt to install it.") from None
# Raise amount of CSV data buffered up to 1MB.
# This is a Linux-only syscall.
fcntl.fcntl(proc.stdout.fileno(), F_SETPIPE_SZ, 1 << 20)
df = pd.read_csv(proc.stdout, delimiter='\t')
retcode = proc.wait()
if retcode != 0:
raise subprocess.CalledProcessError(
retcode, proc.args, output=proc.stdout, stderr=proc.stderr
)
return df
基本思想是使用 subprocess 模块调用 mysql,MySQL 的标准输出被馈送到管道。管道是一个类似文件的对象,可以直接传递给
pd.read_csv()
。 MySQL 进程在 Pandas 读取 CSV 的同时创建 CSV,因此这比在 Pandas 开始读取之前写入整个文件的方法更具优势。
关于 fcntl 的注意事项:fcntl 在这里很有用,因为默认情况下可以在管道中缓冲的数据量限制为 64kB。我发现将其提高到 1MB 会导致大约 10% 的加速。如果这不可用,将 CSV 写入文件的解决方案可能优于管道方法。
数据集是使用以下脚本生成的。
import pandas as pd
import numpy as np
from english_words import get_english_words_set
np.random.seed(42)
import util
def gen_benchmark_df(data_function, limit):
i = 0
df = data_function(i)
i += 1
while df.memory_usage(deep=True).sum() < limit:
df = pd.concat([df, data_function(i)], ignore_index=True)
i += 1
# Trim excess rows
row_count = len(df.index)
data_size_bytes = df.memory_usage(deep=True).sum()
row_count_needed = int(row_count * (limit / data_size_bytes))
df = df.head(row_count_needed)
return df
def gen_ts_chunk(i):
rows = 100_000
return pd.DataFrame({
'run_id': np.random.randint(1, 1_000_000),
'feature_id': np.random.randint(1, 1_000_000),
'timestep': np.arange(0, rows),
'val': np.cumsum(np.random.uniform(-1, 1, rows))
})
def gen_text_chunk(i):
rows = 10_000
words = list(get_english_words_set(['web2'], lower=True))
text_strings = np.apply_along_axis(lambda x: ' '.join(x), axis=1, arr=np.random.choice(words, size=(rows, 3)))
return pd.DataFrame({
'id': np.arange(i * rows, (i + 1) * rows),
'data': text_strings
})
dataset_size = 1e8
con = util.open_engine()
timeseries_df = gen_benchmark_df(gen_ts_chunk, dataset_size)
timeseries_df.to_sql('timeseries', con=con, if_exists='replace', index=False, chunksize=10_000)
dataset_size = 5e8
text_df = gen_benchmark_df(gen_text_chunk, dataset_size)
text_df.to_sql('text', con=con, if_exists='replace', index=False, chunksize=10_000)