FastAPI中基于POST请求参数跳过认证

问题描述 投票:0回答:1

我目前正在开发一个 FastAPI 项目,并面临实现自定义身份验证器的挑战。我的目标是根据请求正文中特定参数的值跳过身份验证,并在满足条件时返回硬编码的用户 ID。

这是我的 main.py 文件的简化版本,运行如下:

from fastapi import Depends, FastAPI, HTTPException, Request, status
from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
from fastapi.security import OAuth2AuthorizationCodeBearer, SecurityScopes
from pydantic import BaseModel, parse_obj_as
from jose import jwt
import uvicorn
import json

app = FastAPI()

# FastAPI OAuth2 configuration
fastapi_oauth2 = OAuth2AuthorizationCodeBearer(
    authorizationUrl="your_authorization_url",
    tokenUrl="your_token_url",
)

# Mockup of auth0_jwks for the sake of example
auth0_jwks = "your_auth0_jwks"

# Mockup of SearchRequest and verify_token for the sake of example
class SearchRequest(BaseModel):
    query: str
    tenant: str

# Mockup of parse_request for the sake of example
async def parse_request(api_request: Request) -> SearchRequest:
    body = await api_request.body()
    body = json.loads(body.decode("utf-8"))
    return parse_obj_as(SearchRequest, body)

def verify_token(token: str = Depends(fastapi_oauth2)) -> dict:
    """
    Verifies Auth0 access token and attached permissions(scopes).
    Returns a dictionary of claims from the verified token.
    """
    if token == "mock_token":
        return {"sub": "mock_user_id"}

    try:
        # Add your token verification logic here, using your custom settings and keys
        jwt_claims = jwt.decode(token, key=auth0_jwks, audience="your_audience")
    except Exception as exc:
        raise HTTPException(
            status_code=401,
            detail="Could not validate credentials",
            headers={"WWW-Authenticate": "Bearer"},
        ) from exc

    return jwt_claims

# Function causing the issue
def get_current_user_or_skip_auth(
    api_request: SearchRequest = Depends(parse_request),
    token: str | None = Depends(fastapi_oauth2),
) -> str:
    if api_request.tenant == "special_tenant":
        return f"{api_request.tenant}_user"

    jwt_claims = verify_token(token)  # Pass the token to verify_token
    if jwt_claims:
        return jwt_claims.get("sub")

    raise HTTPException(
        status_code=401,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

# Basic POST request endpoint
@app.post("/process_request")
async def process_request(
    request: SearchRequest, user_id: str = Depends(get_current_user_or_skip_auth)
):
    """
    Process the incoming request.
    """
    return {"user_id": user_id, "request_data": request.dict()}

# Run the FastAPI application
if __name__ == "__main__":

    uvicorn.run(app, host="127.0.0.1", port=8000)

问题:

当我在没有令牌的情况下向租户“special_tenant”发出 POST 请求时,我希望端点跳过身份验证过程并返回一个 JSON,其中 user_id 为“special_tenant_user”以及请求数据。但是,我目前收到“未验证”错误。

当我使用模拟令牌向租户“regular_tenant”发出 POST 请求时,我希望端点返回一个 JSON,其中 user_id 为“mock_user_id”以及请求数据。目前这正在按预期工作。

以下是该问题的示例:

  1. 使用
    uvicorn main:app --reload
  2. 启动应用程序
  3. 打开另一个终端并运行以下curl命令,以不带令牌的方式向租户“special_tenant”发出POST请求:
curl -X POST "http://127.0.0.1:8000/process_request" -H  "accept: application/json" -H  "Content-Type: application/json" -d "{\"query\":\"test_query\",\"tenant\":\"special_tenant\"}"
  1. 服务器返回“未验证”错误。
  2. 但我希望它返回一个 JSON,其中 user_id 为“special_tenant_user”以及请求数据。
  3. 向普通租户发布请求是有效的,因为它经过身份验证
curl -X POST "http://127.0.0.1:8000/process_request" -H  "accept: application/json" -H  "Content-Type: application/json" -H "Authorization: Bearer mock_token" -d "{\"query\":\"test_query\",\"tenant\":\"regular_tenant\"}"
  1. 按预期返回
{"user_id":"mock_user_id","request_data":{"query":"test_query","tenant":"regular_tenant"}}%                                         

问题:

跳过身份验证并根据请求正文中的特定参数返回硬编码的用户 ID 的正确方法是什么?在 OAuth 标准方面安全的情况下,这是否可能做到?

如有任何帮助,我们将不胜感激。预先感谢!

python oauth-2.0 fastapi auth0
1个回答
0
投票

问题已通过自定义

AuthMiddleWare
类解决。 棘手的部分是尝试不同的配置来绕过 FastAPI 使用
Depends
使用依赖注入的方式。
Depends
确实使代码更简单,但可定制性也较差。

下面

完整

main.py

from fastapi import Depends, FastAPI, HTTPException, Request, status
from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
from fastapi.security import OAuth2AuthorizationCodeBearer, SecurityScopes
from pydantic import BaseModel, parse_obj_as
from jose import jwt
import uvicorn
import json
from loguru import logger

app = FastAPI()

# FastAPI OAuth2 configuration
fastapi_oauth2 = OAuth2AuthorizationCodeBearer(
    authorizationUrl="your_authorization_url",
    tokenUrl="your_token_url",
    auto_error=False, # when HTTP Authorization header is not available, instead of erroring out, the dependency result will be `None`.
)

# Mockup of auth0_jwks for the sake of example
auth0_jwks = "your_auth0_jwks"

# Mockup of SearchRequest and verify_token for the sake of example
class SearchRequest(BaseModel):
    query: str
    tenant: str

def verify_token(token: str = Depends(fastapi_oauth2)) -> dict:
    """
    Verifies Auth0 access token and attached permissions(scopes).
    Returns a dictionary of claims from the verified token.
    """
    if token == "mock_token":
        return {"sub": "mock_user_id"}

    try:
        # Add your token verification logic here, using your custom settings and keys
        jwt_claims = jwt.decode(token, key=auth0_jwks, audience="your_audience")
    except Exception as exc:
        raise HTTPException(
            status_code=401,
            detail="Could not validate credentials",
            headers={"WWW-Authenticate": "Bearer"},
        ) from exc

    return jwt_claims

class AuthMiddleWare:
    """
    Middleware class for handling authentication in the API.
    """

    @staticmethod
    async def get_current_user(api_request: Request, token: str = Depends(fastapi_oauth2)) -> str:
        """
        Get user ID('sub' JWT claim) unless there is a custom tenant, in which
        case we skip auth and return a custom user.
        """
        body = await api_request.json()

        tenant_name = body.get("tenant", None)
        if tenant_name == "special_tenant":
            logger.info(f"returning {tenant_name}_user")
            return f"{tenant_name}_user"
        
        logger.info("tenant is generic, calling verify_token")
        jwt_claims = verify_token(token)
        if jwt_claims:
            return jwt_claims.get("sub")

        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Could not validate credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )

# Basic POST request endpoint
@app.post("/process_request")
async def process_request(
    request: SearchRequest, user_id: str = Depends(AuthMiddleWare.get_current_user)
):
    """
    Process the incoming request.
    """
    return {"user_id": user_id, "request_data": request.dict()}

# Run the FastAPI application
if __name__ == "__main__":

    uvicorn.run(app, host="127.0.0.1", port=8000)

© www.soinside.com 2019 - 2024. All rights reserved.