Databricks:使用 ThreadPoolExecutor 时对函数的异步调用以及使用执行器的并发调用

问题描述 投票:0回答:1

我们有Databricks功能

将请求导入为 req

def my_rest_api_call(my_list)

在内部使用

调用服务 URL

resp = req.post(url = serviceUrl, data = webParams)

这会以 JSON 的形式返回responseData

我们使用以下方法调用 my_rest_api_call 近 50K 次:

使用ThreadPoolExecutor(并发)作为执行器:

  **results = executor.map(my_rest_api_call, listChunks)**

这里我们设置的并发数大约为 10 有时这些 API 花费的时间太长,因此如果需要更多时间,我们需要停止它们。

实际执行 API 调用的代码,我们希望用这个异步包装器包围,以便在调用 my_rest_api_call 花费超过 X 时间(比方说 3 分钟)时优雅地停止并冒泡错误。

python asynchronous databricks python-asyncio azure-databricks
1个回答
0
投票
您可以在您的场景中尝试此代码。

import asyncio import requests as req from concurrent.futures import ThreadPoolExecutor async def my_rest_api_call_timeout(url): loop = asyncio.get_event_loop() future = loop.run_in_executor(None, req.get, url) try: response = await asyncio.wait_for(future, timeout=0.01) #give timeout 280 in your case print(response.json() ) return response.json() except asyncio.TimeoutError: raise TimeoutError("API call timed out") async def call_api_with_timeout(url): try: return await my_rest_api_call_timeout(url) except TimeoutError as e: print("API call timed out:", e) async def main(): concurrency = 10 listChunks = ["https://postman-echo.com/get?test=123","https://postman-echo.com/get?test=123","https://postman-echo.com/get?test=123"] with ThreadPoolExecutor(concurrency) as executor: await asyncio.gather(*[call_api_with_timeout(url) for url in listChunks]) if __name__ == "__main__": await main()
在这里我发出了 get 请求,您将其更改为 post 并将参数选项传递给您的函数,如下所示。

loop.run_in_executor(None, req.get, url, params,body)


输出:

enter image description here

对于给定的超时

0.01

我遇到超时异常。

© www.soinside.com 2019 - 2024. All rights reserved.