在 FastAPI 端点中调用并发.futures.ThreadPoolExecutor 是否危险?

问题描述 投票:0回答:1

我有以下测试代码:

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor() as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

我需要在 FastAPI 端点中使用代码的

concurrent.futures.ThreadPoolExecutor
部分。

我担心的是 API 调用数量和线程包含的影响。担心创建太多线程及其相关后果、主机挨饿、应用程序和/或主机崩溃。

对这种方法有什么想法或陷阱吗?

python python-3.x fastapi concurrent.futures asgi
1个回答
1
投票

您应该使用

HTTPX
库,它提供了
async
API
。如 this answer 中所述,您生成一个
Client
并在每次需要时重复使用它。要使用 HTTPX
 发出 
异步请求,您需要一个
AsyncClient

您还可以使用

limits
上的
Client
关键字参数来控制连接池大小,该参数采用
httpx.Limits
的实例。例如:

limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
client = httpx.AsyncClient(limits=limits)

您可以根据您的需要调整以上内容。根据池限制配置的文档:

  • max_keepalive_connections
    ,允许的保持活动连接数,或
    None
    始终允许。 (默认20
  • max_connections
    ,允许的最大连接数,或
    None
    表示无限制。 (默认100
  • keepalive_expiry
    ,空闲保持活动连接的时间限制(以秒为单位),或
    None
    表示无限制。 (默认5

如果您还想调整超时,可以使用

timeout
参数为单个请求或
Client
/
AsyncClient
实例设置超时,这会导致使用给定的超时作为此客户端发出的请求的默认值(另请参阅
Timeout
类的实现)。您可以详细指定超时行为;例如,设置
read
超时参数将指定等待接收数据块(即响应正文的块)的最大持续时间。如果
HTTPX
无法在此时间范围内接收数据,则会引发
ReadTimeout
异常。如果设置为
None
而不是某个正数值,则
timeout
上不会有
read
。所有操作的默认时间为 5 秒
timeout

完成后,您可以使用

await client.aclose()
显式关闭
AsyncClient
(例如,这可以在 shutdown 事件处理程序中完成)。

运行多个异步操作,因为您需要请求五个不同的 URL,因此在调用 API 端点时,您可以使用 awaitable

asyncio.gather()
。它将执行
async
操作并按照 awaitables (
tasks
) 传递给该函数的顺序返回结果列表。

工作示例

from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
import httpx
import asyncio


URLS = ['https://www.foxnews.com/',
        'https://edition.cnn.com/',
        'https://www.nbcnews.com/',
        'https://www.bbc.co.uk/',
        'https://www.reuters.com/']
        


@asynccontextmanager
async def lifespan(app: FastAPI):
    # customise settings
    limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
    timeout = httpx.Timeout(5.0, read=15.0)  # 15s timeout on read. 5s timeout elsewhere.

    # Initialise the Client on startup and add it to the state
    async with httpx.AsyncClient(limits=limits, timeout=timeout) as client:
        yield {'client': client}
        # The Client closes on shutdown 


app = FastAPI(lifespan=lifespan)


async def send(url, client):
    return await client.get(url)


@app.get('/')
async def main(request: Request):
    client = request.state.client
    tasks = [send(url, client) for url in URLS]
    responses = await asyncio.gather(*tasks)
    return [r.text[:50] for r in responses]  # for demo purposes, only return the first 50 chars of each response

如果您想避免将整个响应正文读取到RAM中,您可以在httpx中使用

流式响应
,以及利用FastAPI的
StreamingResponse
,如这个答案中所述并在下面演示:

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from contextlib import asynccontextmanager
import httpx
import asyncio


URLS = ['https://www.foxnews.com/',
        'https://edition.cnn.com/',
        'https://www.nbcnews.com/',
        'https://www.bbc.co.uk/',
        'https://www.reuters.com/']
        


@asynccontextmanager
async def lifespan(app: FastAPI):
    # customise settings
    limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
    timeout = httpx.Timeout(5.0, read=15.0)  # 15s timeout on read. 5s timeout elsewhere.

    # Initialise the Client on startup and add it to the state
    async with httpx.AsyncClient(limits=limits, timeout=timeout) as client:
        yield {'client': client}
        # The Client closes on shutdown 


app = FastAPI(lifespan=lifespan)


async def send(url, client):
    req = client.build_request('GET', url)
    return await client.send(req, stream=True)


async def iter_content(responses):
     for r in responses:
        async for chunk in r.aiter_text():
            yield chunk[:50]  # for demo purposes, only return the first 50 chars of each response
            yield '\n\n'
            break
        await r.aclose()


@app.get('/')
async def main(request: Request):
    client = request.state.client
    tasks = [send(url, client) for url in URLS]
    responses = await asyncio.gather(*tasks)
    return StreamingResponse(iter_content(responses), media_type='text/event-stream')
© www.soinside.com 2019 - 2024. All rights reserved.