我正在使用两个应该异步处理所有操作的库:一个用于从服务器接收流数据,另一个库 psycopg 用于将数据保存到 TimescaleDB 中。但是,由于某种原因, conn.commit() 阻塞了主事件循环,因此,接收数据和将数据保存到数据库之间的时间延迟随着每次迭代而增加。如何在不将数据库调用移至单独线程的情况下解决此问题?
async def exec_insert_trade(conn, trade, current_time):
trade_values = await generate_trade_values(trade)
columns = ', '.join(trade_values.keys())
placeholders = ', '.join(['%s'] * len(trade_values))
query = f"INSERT INTO trades ({columns}) VALUES ({placeholders})"
time_difference = current_time - trade.time # Calculate the difference
print(f"Time difference trade: {time_difference}")
await conn.execute(query, list(trade_values.values()))
await conn.commit()
async def main():
async with AsyncRetryingClient(TINKOFF_READONLY_TOKEN, settings=retry_settings) as client, \
AsyncConnectionPool(POSTGRES_DATABASE_URL, min_size=2) as pool:
async for marketdata in client.market_data_stream.market_data_stream(request_iterator()):
current_time = datetime.now(timezone.utc)
async with pool.connection() as conn:
if marketdata.trade is not None:
await exec_insert_trade(conn, marketdata.trade, current_time)
if __name__ == "__main__":
asyncio.run(main())
....
Time difference trade: 0:00:00.233646
Time difference trade: 0:00:00.952377
Time difference trade: 0:00:01.187182
Time difference trade: 0:00:01.042835
Time difference trade: 0:00:03.101548
Time difference trade: 0:00:06.067422
Time difference trade: 0:00:07.025047
...
您的代码正在使用协程,但它仅使用单个任务,即由
asyncio.run
创建的任务。因此,您的代码中没有任何内容实际上是异步的。它正在同步执行。
请参阅我的答案,了解有关理解纯协程和任务之间差异的更多详细信息。它的缺点是您需要任务来同时执行事物。
当你调用
await exec_insert_trade(conn, marketdata.trade, current_time)
时,它会阻塞当前任务直到完成。因此,除非 exec_insert_trade
中使用的任何底层库执行任务创建,否则它将是同步的。如果您不需要 exec_insert_trade
的任何返回值,将其作为任务启动。
解决这个问题的另一种方法是认识到这是一个经典的生产者-消费者架构。您有一段代码生成数据(接收流数据的部分),另一部分使用该数据(数据库编写器)。
asyncio
队列来执行此操作。
import asyncio
from datetime import datetime
async def write_to_database(queue: asyncio.Queue) -> None:
while True:
message = await queue.get()
await asyncio.sleep(1)
print(f"Wrote {message} to database")
async def receive_data_from_external_source(queue: asyncio.Queue) -> None:
while True:
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
queue.put_nowait(current_time)
await asyncio.sleep(1)
async def main() -> None:
queue = asyncio.Queue()
await asyncio.gather(
write_to_database(queue),
receive_data_from_external_source(queue),
)
asyncio.run(main())
该程序将如下内容打印到控制台:
Wrote 11:14:39 to database
Wrote 11:14:40 to database
Wrote 11:14:41 to database
Wrote 11:14:42 to database
Wrote 11:14:43 to database
Wrote 11:14:44 to database
Wrote 11:14:45 to database
Wrote 11:14:46 to database
Wrote 11:14:47 to database
Wrote 11:14:48 to database
Wrote 11:14:49 to database
Wrote 11:14:50 to database
请注意,虽然这两个任务都需要一秒钟才能完成,但数据仍然以每秒一次的节奏写出。这是因为协程作为任务启动,在本例中使用
asyncio.gather
。
在你的真实程序中,你当然不会调用
asyncio.sleep
。您需要了解数据传入的速率以及写入数据的速率,但这适用于任何解决方案。