使用 Flask 请求上下文进行用户管理

问题描述 投票:0回答:2

我目前正在开发一个项目,我正在使用 Flask、Flask-RESTful 构建 API,然后使用 Auth0 进行用户身份验证。目前,我正在努力将 Auth0 子 id 从示例装饰器传递到 Flask-RESTful 方法。这是下面的代码。

Auth0 文档提供的 Auth 装饰器:

def get_token_auth_header():
    """Obtains the access token from the Authorization Header"""
    auth = request.headers.get("Authorization", None)
    if not auth:
        raise AuthError(
            {
                "code": "authorization_header_missing",
                "description": "Authorization header is expected",
            },
            401,
        )

    parts = auth.split()

    if parts[0].lower() != "bearer":
        raise AuthError(
            {
                "code": "invalid_header",
                "description": "Authorization header must start with" " Bearer",
            },
            401,
        )
    elif len(parts) == 1:
        raise AuthError(
            {"code": "invalid_header", "description": "Token not found"}, 401
        )
    elif len(parts) > 2:
        raise AuthError(
            {
                "code": "invalid_header",
                "description": "Authorization header must be" " Bearer token",
            },
            401,
        )

    token = parts[1]
    return token

def requires_auth(f):
    """Determines if the access token is valid"""

    @wraps(f)
    def decorated(*args, **kwargs):
        token = get_token_auth_header()
        jsonurl = urlopen("https://" + AUTH0_DOMAIN + "/.well-known/jwks.json")
        jwks = json.loads(jsonurl.read())
        try:
            unverified_header = jwt.get_unverified_header(token)
        except jwt.JWTError:
            raise AuthError(
                {
                    "code": "invalid_header",
                    "description": "Invalid header. "
                    "Use an RS256 signed JWT Access Token",
                },
                401,
            )
        if unverified_header["alg"] == "HS256":
            raise AuthError(
                {
                    "code": "invalid_header",
                    "description": "Invalid header. "
                    "Use an RS256 signed JWT Access Token",
                },
                401,
            )
        rsa_key = {}
        for key in jwks["keys"]:
            if key["kid"] == unverified_header["kid"]:
                rsa_key = {
                    "kty": key["kty"],
                    "kid": key["kid"],
                    "use": key["use"],
                    "n": key["n"],
                    "e": key["e"],
                }
        if rsa_key:
            try:
                payload = jwt.decode(
                    token,
                    rsa_key,
                    algorithms=ALGORITHMS,
                    audience=API_IDENTIFIER,
                    issuer="https://" + AUTH0_DOMAIN + "/",
                )
            except jwt.ExpiredSignatureError:
                raise AuthError(
                    {"code": "token_expired", "description": "token is expired"}, 401
                )
            except jwt.JWTClaimsError as jce:
                raise AuthError(
                    {
                        "code": "invalid_claims",
                        "description": "incorrect claims,"
                        " please check the audience and issuer",
                    },
                    401,
                )
            except Exception:
                raise AuthError(
                    {
                        "code": "invalid_header",
                        "description": "Unable to parse authentication" " token.",
                    },
                    401,
                )

            _request_ctx_stack.top.current_user = payload
            print(payload)
            return f(*args, **kwargs)
        raise AuthError(
            {"code": "invalid_header", "description": "Unable to find appropriate key"},
            401,
        )

    return decorated

使用示例:

class HealthCheck(Resource):

    @requires_auth
    @cross_origin(headers=["Content-Type", "Authorization", "Access-Control-Allow-Origin", "http://localhost:3000"])
    def post(self):
        print(f'Req: {_request_ctx_stack.top.current_user}')
        return jsonify(message=_request_ctx_stack.top.current_user)

具体代码:

_request_ctx_stack.top.current_user = payload

目前,requires_auth 包装器将 Auth0 响应负载存储在 _request_ctx_stack.top.current_user 中。是否有处理此问题的最佳实践?我的资源中的上述实现是获取有效负载的最佳方法吗?有些东西告诉我有更好的方法。我尝试将其放入 kwargs 中以在标头中访问,但 Flask-restful 不喜欢这样。我也玩过flask.g,但这似乎同样不安全,因为它是应用程序上下文。谢谢!

python flask auth0 flask-restful
2个回答
1
投票

前导下划线看起来确实很奇怪,但这似乎是处理这个问题的正确方法。如果您想隐藏一点复杂性,您可以使用 LocalProxy ,如下所示:

auth.py

from flask import _request_ctx_stack
from werkzeug.local import LocalProxy

current_user = LocalProxy(lambda: _request_ctx_stack.top.current_user)

# ...
def requires_auth(f):
# ...
                _request_ctx_stack.top.current_user = payload

routes.py

from .auth import current_user

class HealthCheck(Resource):

    @requires_auth
    @cross_origin(headers=["Content-Type", "Authorization", "Access-Control-Allow-Origin", "http://localhost:3000"])
    def post(self):
        print(f'Req: {current_user}')
        return jsonify(message=current_user)

0
投票

还有另一种方法可以做到这一点,我过去使用

flask-httpauth
来管理身份验证,您可以使用基本或基于令牌的身份验证。

使用这种方法,您可以使用集成的装饰器并直接获得

current_user

    @token_auth.login_required
    def post(self, organization_id: int):
        """Create an item."""
        usr = token_auth.current_user()
        ...
© www.soinside.com 2019 - 2024. All rights reserved.