如何将Kerberos集成到FastApi中?

问题描述 投票:0回答:1

是否存在将 Kerberos 身份验证集成到 FastApi 的现有方法?

我可以在这个SO问题中看到创建自定义身份验证方法的详细信息。 FastApi 文档here描述了如何实现基本身份验证。我是否正确地认为我需要修改它以返回带有 Www-Authenticate: Negotiate for no token 并验证授权标头中提供的令牌的 401? Python 中是否有一个这样的示例,或者是否已经有一个现成的 Kerberos 库可与 FastApi 一起使用?

python fastapi kerberos
1个回答
3
投票

是的,通过 SPNEGO 的 Kerberos 是通过 WWW-Authenticate 中的

Negotiate
机制实现的。您可以使用 Python
spnego
gssapi
sspi
sspilib
模块来验证 Kerberos 令牌(第一个是跨平台的,另外两个分别适用于 Linux 和 Windows)。在大多数情况下,这相当简单:

  • 对于 python-pyspnego,创建一个
    spnego.server()
    来验证令牌。
  • 对于 python-gssapi,创建一个
    gssapi.SecurityContext(creds=..., usage="accept")
  • python-pykerberos 也可以工作,但使用起来不太愉快。

票证将根据系统默认密钥表(或 $KRB5_KTNAME)进行验证;

gssapi
模块允许通过代码指定自定义服务器凭据,否则您需要通过 os.environ 指定密钥表。

mech, _, in_token = req.headers["Authorization"].partition(b" ")
if mech == b"Negotiate":
    # contexts are not reusable -- each HTTP request is a brand new auth
    context = gssapi.SecurityContext(usage="accept")
    in_token = base64.b64decode(in_token)
    # step() throws exception on auth failure
    out_token = context.step(in_token)
    out_token = base64.b64encode(out_token)
    out_hdr = b"WWW-Authenticate: " + mech + b" " + out_token

我没有专门针对 FastApi 的示例,所以我假设它是一个通用的 ASGI 应用程序 - 这是我在各种基于 WSGI 的项目中使用的 WSGI 中间件的一个主要工作的 ASGI 端口,仅作为示例如何使用

gssapi.Credentials()
:

# (c) 2021-2023 Mantas Mikulėnas <[email protected]>
# Released under the MIT license
import asyncio
import base64
import gssapi

class GSSAPIWrapper():
    def __init__(self, app, *,
                       service_name=None,
                       keytab_path=None,
                       credentials=None):
        if service_name and credentials:
            raise ValueError("'service_name' and 'credentials' are mutually exclusive")
        if keytab_path and credentials:
            raise ValueError("'keytab_path' and 'credentials' are mutually exclusive")

        if not credentials:
            if not isinstance(service_name, gssapi.Name):
                service_name = gssapi.Name(service_name or "HTTP@",
                                           gssapi.NameType.hostbased_service)
            if keytab_path:
                credentials = gssapi.Credentials(name=service_name,
                                                 usage="accept",
                                                 store={"keytab": keytab_path})
            else:
                credentials = gssapi.Credentials(name=service_name,
                                                 usage="accept")

        self.app = app
        self.creds = credentials

    async def __call__(self, scope, receive, send):
        if scope["type"] != "http":
            return await self.app(scope, receive, send)

        in_hdr = dict(scope["headers"]).get(b"authorization", b"")
        if in_hdr.startswith(b"Negotiate "):
            ctx = gssapi.SecurityContext(creds=self.creds, usage="accept")
            in_token = base64.b64decode(in_hdr[10:])
            out_token = ctx.step(in_token)
            #out_token = await asyncio.to_thread(ctx.step, in_token)
            if ctx.complete:
                scope.setdefault("extensions", {})
                scope["extensions"]["gss.initiator"] = ctx.initiator_name
                scope["extensions"]["REMOTE_USER"] = str(ctx.initiator_name)
                async def custom_send(event):
                    if out_token and event["type"] == "http.response.start":
                        out_hdr = base64.b64encode(out_token)
                        event.setdefault("headers", [])
                        event["headers"].append((b"WWW-Authenticate",
                                                 b"Negotiate " + out_hdr))
                    await send(event)
                return await self.app(scope, receive, custom_send)

        # No input header, or the context didn't complete in 1 step
        await send({"type": "http.response.start",
                    "status": 401,
                    "headers": [(b"WWW-Authenticate", b"Negotiate")]})
        await send({"type": "http.response.body",
                    "body": b""})

在大多数情况下,您不需要灵活性,只需在环境中通过

KRB5_KTNAME
指定密钥表路径,因此您可能希望将其移植到
spnego
模块以获得额外的可移植性(因为它可以在使用使用密钥表的 SSPI 和 Linux):

# (c) 2021-2024 Mantas Mikulėnas <[email protected]>
# Released under the MIT license
import asyncio
import base64
import spnego

class GSSAPIWrapper():
    def __init__(self, app, *, hostname=None):
        self.app = app
        self.hostname = hostname

    async def __call__(self, scope, receive, send):
        if scope["type"] != "http":
            return await self.app(scope, receive, send)

        in_hdr = dict(scope["headers"]).get(b"authorization", b"")
        if in_hdr.startswith(b"Negotiate "):
            ctx = spnego.server(hostname=self.hostname,
                                service="HTTP",
                                protocol="negotiate")
            in_token = base64.b64decode(in_hdr[10:])
            out_token = ctx.step(in_token)
            #out_token = await asyncio.to_thread(ctx.step, in_token)
            if ctx.complete:
                scope.setdefault("extensions", {})
                scope["extensions"]["REMOTE_USER"] = str(ctx.client_principal)
                async def custom_send(event):
                    if out_token and event["type"] == "http.response.start":
                        out_hdr = base64.b64encode(out_token)
                        event.setdefault("headers", [])
                        event["headers"].append((b"WWW-Authenticate",
                                                 b"Negotiate " + out_hdr))
                    await send(event)
                return await self.app(scope, receive, custom_send)

        # No input header, or the context didn't complete in 1 step
        await send({"type": "http.response.start",
                    "status": 401,
                    "headers": [(b"WWW-Authenticate", b"Negotiate")]})
        await send({"type": "http.response.body",
                    "body": b""})
© www.soinside.com 2019 - 2024. All rights reserved.