我目前正在开发一个 FastAPI 项目,并面临实现自定义身份验证器的挑战。我的目标是根据请求正文中特定参数的值跳过身份验证,并在满足条件时返回硬编码的用户 ID。
这是我的 main.py 文件的简化版本,运行如下:
from fastapi import Depends, FastAPI, HTTPException, Request, status
from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
from fastapi.security import OAuth2AuthorizationCodeBearer, SecurityScopes
from pydantic import BaseModel, parse_obj_as
from jose import jwt
import uvicorn
import json
app = FastAPI()
# FastAPI OAuth2 configuration
fastapi_oauth2 = OAuth2AuthorizationCodeBearer(
authorizationUrl="your_authorization_url",
tokenUrl="your_token_url",
)
# Mockup of auth0_jwks for the sake of example
auth0_jwks = "your_auth0_jwks"
# Mockup of SearchRequest and verify_token for the sake of example
class SearchRequest(BaseModel):
query: str
tenant: str
# Mockup of parse_request for the sake of example
async def parse_request(api_request: Request) -> SearchRequest:
body = await api_request.body()
body = json.loads(body.decode("utf-8"))
return parse_obj_as(SearchRequest, body)
def verify_token(token: str = Depends(fastapi_oauth2)) -> dict:
"""
Verifies Auth0 access token and attached permissions(scopes).
Returns a dictionary of claims from the verified token.
"""
if token == "mock_token":
return {"sub": "mock_user_id"}
try:
# Add your token verification logic here, using your custom settings and keys
jwt_claims = jwt.decode(token, key=auth0_jwks, audience="your_audience")
except Exception as exc:
raise HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
) from exc
return jwt_claims
# Function causing the issue
def get_current_user_or_skip_auth(
api_request: SearchRequest = Depends(parse_request),
token: str | None = Depends(fastapi_oauth2),
) -> str:
if api_request.tenant == "special_tenant":
return f"{api_request.tenant}_user"
jwt_claims = verify_token(token) # Pass the token to verify_token
if jwt_claims:
return jwt_claims.get("sub")
raise HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Basic POST request endpoint
@app.post("/process_request")
async def process_request(
request: SearchRequest, user_id: str = Depends(get_current_user_or_skip_auth)
):
"""
Process the incoming request.
"""
return {"user_id": user_id, "request_data": request.dict()}
# Run the FastAPI application
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8000)
问题:
当我在没有令牌的情况下向租户“special_tenant”发出 POST 请求时,我希望端点跳过身份验证过程并返回一个 JSON,其中 user_id 为“special_tenant_user”以及请求数据。但是,我目前收到“未验证”错误。
当我使用模拟令牌向租户“regular_tenant”发出 POST 请求时,我希望端点返回一个 JSON,其中 user_id 为“mock_user_id”以及请求数据。目前这正在按预期工作。
以下是该问题的示例:
uvicorn main:app --reload
curl -X POST "http://127.0.0.1:8000/process_request" -H "accept: application/json" -H "Content-Type: application/json" -d "{\"query\":\"test_query\",\"tenant\":\"special_tenant\"}"
curl -X POST "http://127.0.0.1:8000/process_request" -H "accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer mock_token" -d "{\"query\":\"test_query\",\"tenant\":\"regular_tenant\"}"
{"user_id":"mock_user_id","request_data":{"query":"test_query","tenant":"regular_tenant"}}%
问题:
跳过身份验证并根据请求正文中的特定参数返回硬编码的用户 ID 的正确方法是什么?在 OAuth 标准方面安全的情况下,这是否可能做到?
如有任何帮助,我们将不胜感激。预先感谢!
问题已通过自定义
AuthMiddleWare
类解决。
棘手的部分是尝试不同的配置来绕过 FastAPI 使用 Depends
使用依赖注入的方式。 Depends
确实使代码更简单,但可定制性也较差。
下面完整
main.py
from fastapi import Depends, FastAPI, HTTPException, Request, status
from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
from fastapi.security import OAuth2AuthorizationCodeBearer, SecurityScopes
from pydantic import BaseModel, parse_obj_as
from jose import jwt
import uvicorn
import json
from loguru import logger
app = FastAPI()
# FastAPI OAuth2 configuration
fastapi_oauth2 = OAuth2AuthorizationCodeBearer(
authorizationUrl="your_authorization_url",
tokenUrl="your_token_url",
auto_error=False, # when HTTP Authorization header is not available, instead of erroring out, the dependency result will be `None`.
)
# Mockup of auth0_jwks for the sake of example
auth0_jwks = "your_auth0_jwks"
# Mockup of SearchRequest and verify_token for the sake of example
class SearchRequest(BaseModel):
query: str
tenant: str
def verify_token(token: str = Depends(fastapi_oauth2)) -> dict:
"""
Verifies Auth0 access token and attached permissions(scopes).
Returns a dictionary of claims from the verified token.
"""
if token == "mock_token":
return {"sub": "mock_user_id"}
try:
# Add your token verification logic here, using your custom settings and keys
jwt_claims = jwt.decode(token, key=auth0_jwks, audience="your_audience")
except Exception as exc:
raise HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
) from exc
return jwt_claims
class AuthMiddleWare:
"""
Middleware class for handling authentication in the API.
"""
@staticmethod
async def get_current_user(api_request: Request, token: str = Depends(fastapi_oauth2)) -> str:
"""
Get user ID('sub' JWT claim) unless there is a custom tenant, in which
case we skip auth and return a custom user.
"""
body = await api_request.json()
tenant_name = body.get("tenant", None)
if tenant_name == "special_tenant":
logger.info(f"returning {tenant_name}_user")
return f"{tenant_name}_user"
logger.info("tenant is generic, calling verify_token")
jwt_claims = verify_token(token)
if jwt_claims:
return jwt_claims.get("sub")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Basic POST request endpoint
@app.post("/process_request")
async def process_request(
request: SearchRequest, user_id: str = Depends(AuthMiddleWare.get_current_user)
):
"""
Process the incoming request.
"""
return {"user_id": user_id, "request_data": request.dict()}
# Run the FastAPI application
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8000)