如何在 Fastapi 应用程序中对 API 端点请求进行速率限制?我需要对每个用户每秒 5 个请求的 API 调用进行速率限制,超过该限制会阻止该特定用户 60 秒。
在main.py中
def get_application() -> FastAPI:
application = FastAPI(title=PROJECT_NAME, debug=DEBUG, version=VERSION)
application.add_event_handler(
"startup", create_start_app_handler(application))
application.add_event_handler(
"shutdown", create_stop_app_handler(application))
return application
app = get_application()
在 events.py 中
def create_start_app_handler(app: FastAPI) -> Callable:
async def start_app() -> None:
redis = await aioredis.create_redis_pool("redis://localhost:8080")
FastAPILimiter.init(redis)
return start_app
在端点
@router.post('/user',
tags=["user"],
name="user:user", dependencies=[Depends(RateLimiter(times=5, seconds=60))])
***code****
从此文件 test.py 运行。
import uvicorn
from app.main import app
if __name__ == "__main__":
uvicorn.run("test:app", host="0.0.0.0", port=8000, reload=True)
我按照上面进行编辑,但出现以下错误。
File "****ite-packages\starlette\routing.py", line 526, in lifespan
async for item in self.lifespan_context(app):
File "****site-packages\starlette\routing.py", line 467, in default_lifespan
await self.startup()
File "****site-packages\starlette\routing.py", line 502, in startup
await handler()
File "****app\core\services\events.py", line 15, in start_app
redis = await aioredis.create_redis_pool("redis://localhost:8080")
File "****\site-packages\aioredis\commands\__init__.py", line 188, in create_redis_pool
pool = await create_pool(address, db=db,
File "****site-packages\aioredis\pool.py", line 58, in create_pool
await pool._fill_free(override_min=False)
File "C****\site-packages\aioredis\pool.py", line 383, in _fill_free
conn = await self._create_new_connection(self._address)
File "****site-packages\aioredis\connection.py", line 111, in create_connection
reader, writer = await asyncio.wait_for(open_connection(
File "****\asyncio\tasks.py", line 455, in wait_for
return await fut
File "****\site-packages\aioredis\stream.py", line 23, in open_connection
transport, _ = await get_event_loop().create_connection(
File "****\asyncio\base_events.py", line 1033, in create_connection
raise OSError('Multiple exceptions: {}'.format(
OSError: Multiple exceptions: [Errno 10061] Connect call failed ('::1', 8080, 0, 0), [Errno 10061] Connect call failed ('127.0.0.1', 8080)
最好的选择是使用库,因为 FastAPI 不提供开箱即用的此功能。
slowapi很棒,而且易于使用。
你可以像这样使用ut。
from fastapi import FastAPI
from slowapi.errors import RateLimitExceeded
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.get("/home")
@limiter.limit("5/minute")
async def homepage(request: Request):
return PlainTextResponse("test")
@app.get("/mars")
@limiter.limit("5/minute")
async def homepage(request: Request, response: Response):
return {"key": "value"}
FastAPI 本身并不支持这一点,但可以使用一些库(例如下面的库)实现这一点,但通常需要某种数据库支持(redis、memcached 等),尽管 Slowapi 在没有数据库的情况下有内存回退。
为了使用
fastapi-limiter
,如其文档中所示:
注意:您需要运行 Redis 才能正常工作。
import aioredis
import uvicorn
from fastapi import Depends, FastAPI
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter
app = FastAPI()
@app.on_event("startup")
async def startup():
redis = await aioredis.create_redis_pool("redis://localhost")
FastAPILimiter.init(redis)
@app.get("/", dependencies=[Depends(RateLimiter(times=2, seconds=5))])
async def index():
return {"msg": "Hello World"}
if __name__ == "__main__":
uvicorn.run("main:app", debug=True, reload=True)
您可以使用https://github.com/abersheeran/asgi-ratelimit
相比https://pypi.org/project/fastapi-limiter/和https://pypi.org/project/slowapi/,它更能满足你的需求。
这是一个例子:超过每秒五次访问限制后,阻止特定用户 60 秒。
app.add_middleware(
RateLimitMiddleware,
authenticate=AUTH_FUNCTION,
backend=RedisBackend(),
config={
r"^/user": [Rule(second=5, block_time=60)],
},
)
fastapi-limiter
和 slowapi
是非常漂亮的包来实现 Ratelimit in Fastapi
。
但是使用
walrus
也可以做到。但应该启动 redis
数据库。
开始
redis
。
python代码:编写一个python文件:
code1228.py
代码:
from walrus import Database, RateLimitException
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import uvicorn
db = Database()
rate = db.rate_limit('xxx', limit=5, per=60) # in 60s just can only click 5 times
app = FastAPI()
@app.exception_handler(RateLimitException)
def parse_rate_litmit_exception(request: Request, exc: RateLimitException):
msg = {'success': False, 'msg': f'please have a tea for sleep, your ip is: {request.client.host}.'}
return JSONResponse(status_code=429, content=msg)
@app.get('/')
def index():
return {'success': True}
@app.get('/important_api')
@rate.rate_limited(lambda request: request.client.host)
def query_important_data(request: Request):
data = 'important data'
return {'success': True, 'data': data}
if __name__ == "__main__":
uvicorn.run("code1228:app", debug=True, reload=True)
运行这个Python文件。
测试链接。
http://127.0.0.1:8000/important_api
而不是使用使用数据库来存储仅需要几秒钟或几分钟的数据的外部包,我更喜欢使用这个:
在课堂上
RateLimiter
requests_limit
请求使用的总限制可以在time_window
秒内执行。
Remember:
我正在使用客户端 IP 来跟踪他们的请求行为,并在 requests_limit
在 time_window
秒内超出时设置限制。
Note:
它是一个依赖项,因此可能无法与快速API的中间件一起使用(我没有测试过)
from fastapi import FastAPI, Request, HTTPException, Depends
import time
# Initialize FastAPI app
app = FastAPI()
# In-memory storage for request counters
request_counters = {}
# Custom RateLimiter class with dynamic rate limiting values
class RateLimiter:
def __init__(self, requests_limit: int, time_window: int):
self.requests_limit = requests_limit
self.time_window = time_window
async def __call__(self, request: Request):
client_ip = request.client
# Get the current timestamp
current_time = int(time.time())
# Check if client's request counter exists
if client_ip not in request_counters:
request_counters[client_ip] = {"timestamp": current_time, "count": 1}
else:
# Check if the time window has elapsed, reset the counter if needed
if current_time - request_counters[client_ip]["timestamp"] > self.time_window:
# Reset the counter and update the timestamp
request_counters[client_ip]["timestamp"] = current_time
request_counters[client_ip]["count"] = 1
else:
# Check if the client has exceeded the request limit
if request_counters[client_ip]["count"] >= self.requests_limit:
raise HTTPException(status_code=429, detail="Too Many Requests")
else:
request_counters[client_ip]["count"] += 1
# Clean up expired client data (optional)
for ip in list(request_counters.keys()):
if current_time - request_counters[ip]["timestamp"] > self.time_window:
request_counters.pop(ip)
return True
# Include the custom RateLimiter dependency on the routes you want to rate limit
@app.get("/limited", dependencies=[Depends(RateLimiter(requests_limit=10, time_window=60))])
async def limited_endpoint():
return {"message": "This endpoint has rate limiting (10 requests per 60 seconds)."}
@app.get("/unlimited")
async def unlimited_endpoint():
return {"message": "This endpoint has no rate limiting."}
我们使用
__init__
方法创建自定义 RateLimiter 类,以接受自定义速率限制值 (requests_limit and time_window)
作为参数。该类实现了 __call__
方法,该方法允许类实例用作可调用的依赖项。