Fastapi 中的速率限制

问题描述 投票:0回答:5

如何在 Fastapi 应用程序中对 API 端点请求进行速率限制?我需要对每个用户每秒 5 个请求的 API 调用进行速率限制,超过该限制会阻止该特定用户 60 秒。

在main.py中

def get_application() -> FastAPI:
     application = FastAPI(title=PROJECT_NAME, debug=DEBUG, version=VERSION)
     application.add_event_handler(
        "startup", create_start_app_handler(application))
     application.add_event_handler(
        "shutdown", create_stop_app_handler(application))
     return application
app = get_application()

在 events.py 中

def create_start_app_handler(app: FastAPI) -> Callable:  
    async def start_app() -> None:           

        redis = await aioredis.create_redis_pool("redis://localhost:8080")
        FastAPILimiter.init(redis)
    return start_app

在端点

@router.post('/user',
             tags=["user"],
             name="user:user", dependencies=[Depends(RateLimiter(times=5, seconds=60))])
***code****

从此文件 test.py 运行。

import uvicorn

from app.main import app

if __name__ == "__main__":
    uvicorn.run("test:app", host="0.0.0.0", port=8000, reload=True)

我按照上面进行编辑,但出现以下错误。

File "****ite-packages\starlette\routing.py", line 526, in lifespan
    async for item in self.lifespan_context(app):
  File "****site-packages\starlette\routing.py", line 467, in default_lifespan
    await self.startup()
  File "****site-packages\starlette\routing.py", line 502, in startup
    await handler()
  File "****app\core\services\events.py", line 15, in start_app
    redis = await aioredis.create_redis_pool("redis://localhost:8080")
  File "****\site-packages\aioredis\commands\__init__.py", line 188, in create_redis_pool
    pool = await create_pool(address, db=db,
  File "****site-packages\aioredis\pool.py", line 58, in create_pool
    await pool._fill_free(override_min=False)
  File "C****\site-packages\aioredis\pool.py", line 383, in _fill_free
    conn = await self._create_new_connection(self._address)
  File "****site-packages\aioredis\connection.py", line 111, in create_connection
    reader, writer = await asyncio.wait_for(open_connection(
  File "****\asyncio\tasks.py", line 455, in wait_for
    return await fut
  File "****\site-packages\aioredis\stream.py", line 23, in open_connection
    transport, _ = await get_event_loop().create_connection(
  File "****\asyncio\base_events.py", line 1033, in create_connection
    raise OSError('Multiple exceptions: {}'.format(
OSError: Multiple exceptions: [Errno 10061] Connect call failed ('::1', 8080, 0, 0), [Errno 10061] Connect call failed ('127.0.0.1', 8080)
python fastapi rate-limiting
5个回答
37
投票

最好的选择是使用库,因为 FastAPI 不提供开箱即用的此功能。

slowapi很棒,而且易于使用。

你可以像这样使用ut。

from fastapi import FastAPI
from slowapi.errors import RateLimitExceeded
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address


limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.get("/home")
@limiter.limit("5/minute")
async def homepage(request: Request):
    return PlainTextResponse("test")

@app.get("/mars")
@limiter.limit("5/minute")
async def homepage(request: Request, response: Response):
    return {"key": "value"}

11
投票

FastAPI 本身并不支持这一点,但可以使用一些库(例如下面的库)实现这一点,但通常需要某种数据库支持(redis、memcached 等),尽管 Slowapi 在没有数据库的情况下有内存回退。

为了使用

fastapi-limiter
,如其文档中所示:

注意:您需要运行 Redis 才能正常工作。

import aioredis
import uvicorn
from fastapi import Depends, FastAPI

from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter

app = FastAPI()


@app.on_event("startup")
async def startup():
    redis = await aioredis.create_redis_pool("redis://localhost")
    FastAPILimiter.init(redis)


@app.get("/", dependencies=[Depends(RateLimiter(times=2, seconds=5))])
async def index():
    return {"msg": "Hello World"}


if __name__ == "__main__":
    uvicorn.run("main:app", debug=True, reload=True)

5
投票

您可以使用https://github.com/abersheeran/asgi-ratelimit

相比https://pypi.org/project/fastapi-limiter/https://pypi.org/project/slowapi/,它更能满足你的需求。

这是一个例子:超过每秒五次访问限制后,阻止特定用户 60 秒。

app.add_middleware(
    RateLimitMiddleware,
    authenticate=AUTH_FUNCTION,
    backend=RedisBackend(),
    config={
        r"^/user": [Rule(second=5, block_time=60)],
    },
)

1
投票

fastapi-limiter
slowapi
是非常漂亮的包来实现
Ratelimit in Fastapi

但是使用

walrus
也可以做到。但应该启动
redis
数据库。

  1. 开始

    redis

  2. python代码:编写一个python文件:

    code1228.py

  3. 代码:

from walrus import Database, RateLimitException
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import uvicorn

db = Database()
rate = db.rate_limit('xxx', limit=5, per=60)  # in 60s just can only click 5 times

app = FastAPI()


@app.exception_handler(RateLimitException)
def parse_rate_litmit_exception(request: Request, exc: RateLimitException):
    msg = {'success': False, 'msg': f'please have a tea for sleep, your ip is: {request.client.host}.'}
    return JSONResponse(status_code=429, content=msg)


@app.get('/')
def index():
    return {'success': True}


@app.get('/important_api')
@rate.rate_limited(lambda request: request.client.host)
def query_important_data(request: Request):
    data = 'important data'
    return {'success': True, 'data': data}


if __name__ == "__main__":
    uvicorn.run("code1228:app", debug=True, reload=True)

  1. 运行这个Python文件。

  2. 测试链接。

    http://127.0.0.1:8000/important_api


0
投票

而不是使用使用数据库来存储仅需要几秒钟或几分钟的数据的外部包,我更喜欢使用这个:

在课堂上

RateLimiter
requests_limit
请求使用的总限制可以在
time_window
秒内执行。

Remember:
我正在使用客户端 IP 来跟踪他们的请求行为,并在
requests_limit
time_window
秒内超出时设置限制。

Note:
它是一个依赖项,因此可能无法与快速API的中间件一起使用(我没有测试过)

from fastapi import FastAPI, Request, HTTPException, Depends
import time

# Initialize FastAPI app
app = FastAPI()

# In-memory storage for request counters
request_counters = {}

# Custom RateLimiter class with dynamic rate limiting values
class RateLimiter:
    def __init__(self, requests_limit: int, time_window: int):
        self.requests_limit = requests_limit
        self.time_window = time_window

    async def __call__(self, request: Request):
        client_ip = request.client

        # Get the current timestamp
        current_time = int(time.time())

        # Check if client's request counter exists
        if client_ip not in request_counters:
            request_counters[client_ip] = {"timestamp": current_time, "count": 1}
        else:
            # Check if the time window has elapsed, reset the counter if needed
            if current_time - request_counters[client_ip]["timestamp"] > self.time_window:
                # Reset the counter and update the timestamp
                request_counters[client_ip]["timestamp"] = current_time
                request_counters[client_ip]["count"] = 1
            else:
                # Check if the client has exceeded the request limit
                if request_counters[client_ip]["count"] >= self.requests_limit:
                    raise HTTPException(status_code=429, detail="Too Many Requests")
                else:
                    request_counters[client_ip]["count"] += 1

        # Clean up expired client data (optional)
        for ip in list(request_counters.keys()):
            if current_time - request_counters[ip]["timestamp"] > self.time_window:
                request_counters.pop(ip)

        return True

# Include the custom RateLimiter dependency on the routes you want to rate limit
@app.get("/limited", dependencies=[Depends(RateLimiter(requests_limit=10, time_window=60))])
async def limited_endpoint():
    return {"message": "This endpoint has rate limiting (10 requests per 60 seconds)."}

@app.get("/unlimited")
async def unlimited_endpoint():
    return {"message": "This endpoint has no rate limiting."}

我们使用

__init__
方法创建自定义 RateLimiter 类,以接受自定义速率限制值
(requests_limit and time_window)
作为参数。该类实现了
__call__
方法,该方法允许类实例用作可调用的依赖项。

© www.soinside.com 2019 - 2024. All rights reserved.