Python - 如何 - Big Query 异步任务

问题描述 投票:0回答:5

这可能是一个虚拟问题,但我似乎无法异步运行 python google-clood-bigquery。

我的目标是同时运行多个查询,并等待所有查询在

asyncio.wait()
查询收集器中完成。我正在使用
asyncio.create_tast()
启动查询。 问题是每个查询都要等待前一个查询完成才能开始。

这是我的查询功能(很简单):

async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator:
  job = self.api.query(query, **kwargs)
  return job.result()

既然我不能等待

job.result()
我应该等待其他的事情吗?

python async-await google-bigquery
5个回答
24
投票

如果您在

coroutine
内部工作,并且希望在不阻塞
event_loop
的情况下运行不同的查询,那么您可以使用
run_in_executor
函数,它基本上在后台线程中运行查询而不阻塞循环。 这是如何使用它的一个很好的例子。

确保这正是您所需要的;在 Python API 中创建的用于运行查询的作业已经是异步的,它们仅在您调用

job.result()
时才会阻塞。这意味着除非您位于协程内部,否则不需要使用
asyncio

这是一个在作业完成后立即检索结果的快速示例:

from concurrent.futures import ThreadPoolExecutor, as_completed
import google.cloud.bigquery as bq


client = bq.Client.from_service_account_json('path/to/key.json')
query1 = 'SELECT 1'
query2 = 'SELECT 2'

threads = []
results = []

executor = ThreadPoolExecutor(5)

for job in [client.query(query1), client.query(query2)]:
    threads.append(executor.submit(job.result))

# Here you can run any code you like. The interpreter is free

for future in as_completed(threads):
    results.append(list(future.result()))

results
将是:

[[Row((2,), {'f0_': 0})], [Row((1,), {'f0_': 0})]]

6
投票

只是为了分享不同的解决方案:

import numpy as np
from time import sleep


query1 = """
SELECT
  language.name,
  average(language.bytes)
FROM `bigquery-public-data.github_repos.languages` 
, UNNEST(language) AS language
GROUP BY language.name"""
query2 = 'SELECT 2'


def dummy_callback(future):
    global jobs_done
    jobs_done[future.job_id] = True


jobs = [bq.query(query1), bq.query(query2)]
jobs_done = {job.job_id: False for job in jobs}
[job.add_done_callback(dummy_callback) for job in jobs]

# blocking loop to wait for jobs to finish
while not (np.all(list(jobs_done.values()))):
    print('waiting for jobs to finish ... sleeping for 1s')
    sleep(1)

print('all jobs done, do your stuff')

而不是使用

as_completed
我更喜欢使用 bigquery 作业本身的内置异步功能。这也使我可以将数据管道分解为单独的云函数,而不必在整个管道的持续时间内保持主
ThreadPoolExecutor
的活动。顺便说一句,这就是我研究这个问题的原因:我的管道比 Cloud Functions 的最大超时 9 分钟(甚至 Cloud Run 的 15 分钟)还要长。

缺点是我需要跟踪各个函数中的所有

job_id
,但是通过指定输入和输出来配置管道时相对容易解决,以便它们形成有向非循环图。


2
投票

我使用@dkapitan的答案来提供异步包装器:

    async def async_bigquery(client, query):
        done = False
        def callback(future):
            nonlocal done
            done = True
        job = client.query(query)
        job.add_done_callback(callback)
        while not done:
            await asyncio.sleep(.1)
        return job

0
投票

事实上,由于

asyncio.create_task()
函数,我找到了一种非常轻松地将查询包装在异步调用中的方法。 我只需要将
job.result()
包装在协程中即可;这是实现。它现在确实异步运行。

class BQApi(object):                                                                                                 
    def __init__(self):                                                                                              
        self.api = bigquery.Client.from_service_account_json(BQ_CONFIG["credentials"])                               

    async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator:                                       
        job = self.api.query(query, **kwargs)                                                                        
        task = asyncio.create_task(self.coroutine_job(job))                                                          
        return await task                                                                                            

    @staticmethod                                                                                                    
    async def coroutine_job(job):                                                                                    
        return job.result()                                                                                          

0
投票

正如

Willian Fuks
所说,你必须在单独的线程中运行它。 FastAPI 创始人提供了一个库来简化这个过程: 异步器

使用示例:

result = await asyncify(future.result)()

它使用 AnyIO 线程池,因此默认情况下,其中最多有 40 个线程。

© www.soinside.com 2019 - 2024. All rights reserved.