我正在尝试使用不同的表执行 3 个不同的 postgresql 查询。每个查询需要 2 秒执行。我想知道是否可以同时运行所有 3 个查询,这样我可以节省 4 秒。我尝试使用
pyscopg2
的异步功能,但它只返回上次查询的结果。谁能指出我做错了什么?
import select
import psycopg2
import psycopg2.extensions
def wait(conn):
while 1:
state = conn.poll()
if state == psycopg2.extensions.POLL_OK:
break
elif state == psycopg2.extensions.POLL_WRITE:
select.select([], [conn.fileno()], [])
elif state == psycopg2.extensions.POLL_READ:
select.select([conn.fileno()], [], [])
else:
raise psycopg2.OperationalError("poll() returned %s" % state)
aconn = psycopg2.connect(
dbname=pg_name,
user=pg_username,
host=pg_host,
password=pg_password,
async=1)
wait(aconn)
acurs = aconn.cursor()
acurs.execute(
"SELECT 1;"
"SELECT ST_Length(ST_GeomFromText"
"('LINESTRING(743238 2967416,743238 2967450)',4326));"
"SELECT 3;"
)
wait(acurs.connection)
result = acurs.fetchall()
print result
这只打印:“result”:[[3]]
根据《Psycopg》简介:
[Psycopg] 是 libpq(官方 PostgreSQL 客户端库)的包装器。
然后,查看
libpq
的 PQexec()
文档(用于将 SQL 查询发送到 PostgreSQL 数据库的函数),我们看到以下注释(强调我的):
在单个 PQexec 调用中发送的多个查询将在单个事务中处理,除非查询字符串中包含显式的 BEGIN/COMMIT 命令以将其划分为多个事务。 但请注意,返回的 PGresult 结构仅描述从字符串执行的最后一个命令的结果。
因此,不幸的是,
psycopg2
和libpq
根本不支持您想要做的事情。 (但这并不是说 PostgreSQL 的其他客户端接口不支持它,但这超出了这个问题的范围。)
因此,为了回答你的问题,你做错的地方是在一个
execute()
调用中执行多个 SQL 查询,然后尝试检索 all 的结果,而事实上这是不可能的。您需要显式执行每个查询并单独检索结果,或者尝试找到另一个支持一次返回多个结果集的 PostgreSQL API。
Python 数据库 API 2.0 规范确实允许库实现可选的
nextset()
方法,该方法将 cursor
移动到从执行的查询返回的下一个结果集,但此方法在 psycopg2
中未实现
(出于显而易见的原因),实际上,如果您尝试调用它,则会引发 NotSupportedError
异常(请参阅文档)。
看起来从 2.2 版本开始就支持了
def wait(conn):
while True:
state = conn.poll()
if state == psycopg2.extensions.POLL_OK:
break
elif state == psycopg2.extensions.POLL_WRITE:
select.select([], [conn.fileno()], [])
elif state == psycopg2.extensions.POLL_READ:
select.select([conn.fileno()], [], [])
else:
raise psycopg2.OperationalError("poll() returned %s" % state)
来源:https://www.psycopg.org/docs/advanced.html#asynchronous-support
aiopg
包和 Python 的内置 asyncio.gather
来同时执行查询。 aiopg 包在底层使用 psycopg2。
此示例运行三个查询,每个查询需要 2 秒,但总执行时间约为 2 秒,而不是 6 秒
import asyncio
import aiopg
dsn = f'dbname={PG_DATABASE} user={PG_USER} password={PG_PASSWORD} host={PG_HOST}'
async def query(query: str):
async with aiopg.connect(dsn) as con:
async with con.cursor() as cursor:
await cursor.execute(query)
result = []
async for row in cursor:
result.append(row)
return result
async def main():
result1, result2, result3 = await asyncio.gather(
query('SELECT pg_sleep(2)'),
query('SELECT pg_sleep(2)'),
query('SELECT pg_sleep(2)')
)
print(result1, result2, result3)
asyncio.run(main())