我们有Databricks功能
将请求导入为 req
def my_rest_api_call(my_list)
在内部使用
调用服务 URLresp = req.post(url = serviceUrl, data = webParams)
这会以 JSON 的形式返回responseData
我们使用以下方法调用 my_rest_api_call 近 50K 次:
使用ThreadPoolExecutor(并发)作为执行器:
**results = executor.map(my_rest_api_call, listChunks)**
这里我们设置的并发数大约为 10 有时这些 API 花费的时间太长,因此如果需要更多时间,我们需要停止它们。
实际执行 API 调用的代码,我们希望用这个异步包装器包围,以便在调用 my_rest_api_call 花费超过 X 时间(比方说 3 分钟)时优雅地停止并冒泡错误。
import asyncio
import requests as req
from concurrent.futures import ThreadPoolExecutor
async def my_rest_api_call_timeout(url):
loop = asyncio.get_event_loop()
future = loop.run_in_executor(None, req.get, url)
try:
response = await asyncio.wait_for(future, timeout=0.01) #give timeout 280 in your case
print(response.json() )
return response.json()
except asyncio.TimeoutError:
raise TimeoutError("API call timed out")
async def call_api_with_timeout(url):
try:
return await my_rest_api_call_timeout(url)
except TimeoutError as e:
print("API call timed out:", e)
async def main():
concurrency = 10
listChunks = ["https://postman-echo.com/get?test=123","https://postman-echo.com/get?test=123","https://postman-echo.com/get?test=123"]
with ThreadPoolExecutor(concurrency) as executor:
await asyncio.gather(*[call_api_with_timeout(url) for url in listChunks])
if __name__ == "__main__":
await main()
在这里我发出了 get 请求,您将其更改为 post 并将参数选项传递给您的函数,如下所示。
loop.run_in_executor(None, req.get, url, params,body)
对于给定的超时
0.01
我遇到超时异常。