我有以下测试代码:
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor() as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
我需要在 FastAPI 端点中使用代码的
concurrent.futures.ThreadPoolExecutor
部分。
我担心的是 API 调用数量和线程包含的影响。担心创建太多线程及其相关后果、主机挨饿、应用程序和/或主机崩溃。
对这种方法有什么想法或陷阱吗?
HTTPX
库,它提供了 async
API。如 this answer 中所述,您生成一个 Client
并在每次需要时重复使用它。要使用 HTTPX
发出 异步请求,您需要一个 AsyncClient
。
您还可以使用
limits
上的 Client
关键字参数来控制连接池大小,该参数采用 httpx.Limits
的实例。例如:
limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
client = httpx.AsyncClient(limits=limits)
您可以根据您的需要调整以上内容。根据池限制配置的文档:
,允许的保持活动连接数,或max_keepalive_connections
始终允许。 (默认20)None
,允许的最大连接数,或max_connections
表示无限制。 (默认100)None
,空闲保持活动连接的时间限制(以秒为单位),或keepalive_expiry
表示无限制。 (默认5)None
如果您还想调整超时,可以使用
timeout
参数为单个请求或 Client
/AsyncClient
实例设置超时,这会导致使用给定的超时作为此客户端发出的请求的默认值(另请参阅 Timeout
类的实现)。您可以详细指定超时行为;例如,设置 read
超时参数将指定等待接收数据块(即响应正文的块)的最大持续时间。如果 HTTPX
无法在此时间范围内接收数据,则会引发 ReadTimeout
异常。如果设置为 None
而不是某个正数值,则 timeout
上不会有 read
。所有操作的默认时间为 5 秒timeout
。
完成后,您可以使用
await client.aclose()
显式关闭 AsyncClient
(例如,这可以在 shutdown 事件处理程序中完成)。
要运行多个异步操作,因为您需要请求五个不同的 URL,因此在调用 API 端点时,您可以使用 awaitable
asyncio.gather()
。它将执行 async
操作并按照 awaitables (tasks
) 传递给该函数的顺序返回结果列表。
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
import httpx
import asyncio
URLS = ['https://www.foxnews.com/',
'https://edition.cnn.com/',
'https://www.nbcnews.com/',
'https://www.bbc.co.uk/',
'https://www.reuters.com/']
@asynccontextmanager
async def lifespan(app: FastAPI):
# customise settings
limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
timeout = httpx.Timeout(5.0, read=15.0) # 15s timeout on read. 5s timeout elsewhere.
# Initialise the Client on startup and add it to the state
async with httpx.AsyncClient(limits=limits, timeout=timeout) as client:
yield {'client': client}
# The Client closes on shutdown
app = FastAPI(lifespan=lifespan)
async def send(url, client):
return await client.get(url)
@app.get('/')
async def main(request: Request):
client = request.state.client
tasks = [send(url, client) for url in URLS]
responses = await asyncio.gather(*tasks)
return [r.text[:50] for r in responses] # for demo purposes, only return the first 50 chars of each response
如果您想避免将整个响应正文读取到RAM中,您可以在httpx
中使用
流式响应,以及利用FastAPI的
StreamingResponse
,如这个答案中所述并在下面演示:
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from contextlib import asynccontextmanager
import httpx
import asyncio
URLS = ['https://www.foxnews.com/',
'https://edition.cnn.com/',
'https://www.nbcnews.com/',
'https://www.bbc.co.uk/',
'https://www.reuters.com/']
@asynccontextmanager
async def lifespan(app: FastAPI):
# customise settings
limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
timeout = httpx.Timeout(5.0, read=15.0) # 15s timeout on read. 5s timeout elsewhere.
# Initialise the Client on startup and add it to the state
async with httpx.AsyncClient(limits=limits, timeout=timeout) as client:
yield {'client': client}
# The Client closes on shutdown
app = FastAPI(lifespan=lifespan)
async def send(url, client):
req = client.build_request('GET', url)
return await client.send(req, stream=True)
async def iter_content(responses):
for r in responses:
async for chunk in r.aiter_text():
yield chunk[:50] # for demo purposes, only return the first 50 chars of each response
yield '\n\n'
break
await r.aclose()
@app.get('/')
async def main(request: Request):
client = request.state.client
tasks = [send(url, client) for url in URLS]
responses = await asyncio.gather(*tasks)
return StreamingResponse(iter_content(responses), media_type='text/event-stream')