这可能是一个虚拟问题,但我似乎无法异步运行 python google-clood-bigquery。
我的目标是同时运行多个查询,并等待所有查询在
asyncio.wait()
查询收集器中完成。我正在使用 asyncio.create_tast()
启动查询。
问题是每个查询都要等待前一个查询完成才能开始。
这是我的查询功能(很简单):
async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator:
job = self.api.query(query, **kwargs)
return job.result()
既然我不能等待
job.result()
我应该等待其他的事情吗?
如果您在
coroutine
内部工作,并且希望在不阻塞 event_loop
的情况下运行不同的查询,那么您可以使用 run_in_executor
函数,它基本上在后台线程中运行查询而不阻塞循环。 这是如何使用它的一个很好的例子。
确保这正是您所需要的;在 Python API 中创建的用于运行查询的作业已经是异步的,它们仅在您调用
job.result()
时才会阻塞。这意味着除非您位于协程内部,否则不需要使用 asyncio
。
这是一个在作业完成后立即检索结果的快速示例:
from concurrent.futures import ThreadPoolExecutor, as_completed
import google.cloud.bigquery as bq
client = bq.Client.from_service_account_json('path/to/key.json')
query1 = 'SELECT 1'
query2 = 'SELECT 2'
threads = []
results = []
executor = ThreadPoolExecutor(5)
for job in [client.query(query1), client.query(query2)]:
threads.append(executor.submit(job.result))
# Here you can run any code you like. The interpreter is free
for future in as_completed(threads):
results.append(list(future.result()))
results
将是:
[[Row((2,), {'f0_': 0})], [Row((1,), {'f0_': 0})]]
只是为了分享不同的解决方案:
import numpy as np
from time import sleep
query1 = """
SELECT
language.name,
average(language.bytes)
FROM `bigquery-public-data.github_repos.languages`
, UNNEST(language) AS language
GROUP BY language.name"""
query2 = 'SELECT 2'
def dummy_callback(future):
global jobs_done
jobs_done[future.job_id] = True
jobs = [bq.query(query1), bq.query(query2)]
jobs_done = {job.job_id: False for job in jobs}
[job.add_done_callback(dummy_callback) for job in jobs]
# blocking loop to wait for jobs to finish
while not (np.all(list(jobs_done.values()))):
print('waiting for jobs to finish ... sleeping for 1s')
sleep(1)
print('all jobs done, do your stuff')
而不是使用
as_completed
我更喜欢使用 bigquery 作业本身的内置异步功能。这也使我可以将数据管道分解为单独的云函数,而不必在整个管道的持续时间内保持主ThreadPoolExecutor
的活动。顺便说一句,这就是我研究这个问题的原因:我的管道比 Cloud Functions 的最大超时 9 分钟(甚至 Cloud Run 的 15 分钟)还要长。
缺点是我需要跟踪各个函数中的所有
job_id
,但是通过指定输入和输出来配置管道时相对容易解决,以便它们形成有向非循环图。
我使用@dkapitan的答案来提供异步包装器:
async def async_bigquery(client, query):
done = False
def callback(future):
nonlocal done
done = True
job = client.query(query)
job.add_done_callback(callback)
while not done:
await asyncio.sleep(.1)
return job
事实上,由于
asyncio.create_task()
函数,我找到了一种非常轻松地将查询包装在异步调用中的方法。
我只需要将 job.result()
包装在协程中即可;这是实现。它现在确实异步运行。
class BQApi(object):
def __init__(self):
self.api = bigquery.Client.from_service_account_json(BQ_CONFIG["credentials"])
async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator:
job = self.api.query(query, **kwargs)
task = asyncio.create_task(self.coroutine_job(job))
return await task
@staticmethod
async def coroutine_job(job):
return job.result()
正如
Willian Fuks
所说,你必须在单独的线程中运行它。 FastAPI 创始人提供了一个库来简化这个过程:
异步器
使用示例:
result = await asyncify(future.result)()
它使用 AnyIO 线程池,因此默认情况下,其中最多有 40 个线程。