如何使用pyscopg2的异步功能?

问题描述 投票:0回答:3

我正在尝试使用不同的表执行 3 个不同的 postgresql 查询。每个查询需要 2 秒执行。我想知道是否可以同时运行所有 3 个查询,这样我可以节省 4 秒。我尝试使用

pyscopg2
的异步功能,但它只返回上次查询的结果。谁能指出我做错了什么?

import select
import psycopg2
import psycopg2.extensions

def wait(conn):
    while 1:
        state = conn.poll()
        if state == psycopg2.extensions.POLL_OK:
            break
        elif state == psycopg2.extensions.POLL_WRITE:
            select.select([], [conn.fileno()], [])
        elif state == psycopg2.extensions.POLL_READ:
            select.select([conn.fileno()], [], [])
        else:
            raise psycopg2.OperationalError("poll() returned %s" % state)


aconn = psycopg2.connect(
  dbname=pg_name,
  user=pg_username,
  host=pg_host,
  password=pg_password,
  async=1)

wait(aconn)
acurs = aconn.cursor()

acurs.execute(
              "SELECT 1;"
              "SELECT ST_Length(ST_GeomFromText"
              "('LINESTRING(743238 2967416,743238 2967450)',4326));"
              "SELECT 3;"
             )
wait(acurs.connection)
result = acurs.fetchall()
print result

这只打印:“result”:[[3]]

postgresql asynchronous postgis psycopg2
3个回答
8
投票

根据《Psycopg》简介

[Psycopg] 是 libpq(官方 PostgreSQL 客户端库)的包装器。

然后,查看

libpq
PQexec()
文档
(用于将 SQL 查询发送到 PostgreSQL 数据库的函数),我们看到以下注释(强调我的):

在单个 PQexec 调用中发送的多个查询将在单个事务中处理,除非查询字符串中包含显式的 BEGIN/COMMIT 命令以将其划分为多个事务。 但请注意,返回的 PGresult 结构仅描述从字符串执行的最后一个命令的结果。

因此,不幸的是,

psycopg2
libpq
根本不支持您想要做的事情。 (但这并不是说 PostgreSQL 的其他客户端接口不支持它,但这超出了这个问题的范围。)

因此,为了回答你的问题,你做错的地方是在一个

execute()
调用中执行多个 SQL 查询,然后尝试检索 all 的结果,而事实上这是不可能的。您需要显式执行每个查询并单独检索结果,或者尝试找到另一个支持一次返回多个结果集的 PostgreSQL API。


Python 数据库 API 2.0 规范确实允许库实现可选的

nextset()
方法,该方法将
cursor
移动到从执行的查询返回的下一个结果集,但此方法在
psycopg2 中未实现
(出于显而易见的原因),实际上,如果您尝试调用它,则会引发
NotSupportedError
异常(请参阅文档)。


0
投票

看起来从 2.2 版本开始就支持了

def wait(conn):
    while True:
        state = conn.poll()
        if state == psycopg2.extensions.POLL_OK:
            break
        elif state == psycopg2.extensions.POLL_WRITE:
            select.select([], [conn.fileno()], [])
        elif state == psycopg2.extensions.POLL_READ:
            select.select([conn.fileno()], [], [])
        else:
            raise psycopg2.OperationalError("poll() returned %s" % state)

来源:https://www.psycopg.org/docs/advanced.html#asynchronous-support


0
投票

您可以使用

aiopg
包和 Python 的内置
asyncio.gather
来同时执行查询。 aiopg 包在底层使用 psycopg2。

此示例运行三个查询,每个查询需要 2 秒,但总执行时间约为 2 秒,而不是 6 秒

import asyncio
import aiopg

dsn = f'dbname={PG_DATABASE} user={PG_USER} password={PG_PASSWORD} host={PG_HOST}'

async def query(query: str):
    async with aiopg.connect(dsn) as con:
        async with con.cursor() as cursor:
            await cursor.execute(query)
            result = []
            async for row in cursor:
                result.append(row)
            return result


async def main():
    result1, result2, result3 = await asyncio.gather(
        query('SELECT pg_sleep(2)'),
        query('SELECT pg_sleep(2)'),
        query('SELECT pg_sleep(2)')
    )
    print(result1, result2, result3)

asyncio.run(main())
© www.soinside.com 2019 - 2024. All rights reserved.