是否存在将 Kerberos 身份验证集成到 FastApi 的现有方法?
我可以在这个SO问题中看到创建自定义身份验证方法的详细信息。 FastApi 文档here描述了如何实现基本身份验证。我是否正确地认为我需要修改它以返回带有 Www-Authenticate: Negotiate for no token 并验证授权标头中提供的令牌的 401? Python 中是否有一个这样的示例,或者是否已经有一个现成的 Kerberos 库可与 FastApi 一起使用?
是的,通过 SPNEGO 的 Kerberos 是通过 WWW-Authenticate 中的
Negotiate
机制实现的。您可以使用 Python spnego
或 gssapi
或 sspi
或 sspilib
模块来验证 Kerberos 令牌(第一个是跨平台的,另外两个分别适用于 Linux 和 Windows)。在大多数情况下,这相当简单:
spnego.server()
来验证令牌。gssapi.SecurityContext(creds=..., usage="accept")
。票证将根据系统默认密钥表(或 $KRB5_KTNAME)进行验证;
gssapi
模块允许通过代码指定自定义服务器凭据,否则您需要通过 os.environ 指定密钥表。
mech, _, in_token = req.headers["Authorization"].partition(b" ")
if mech == b"Negotiate":
# contexts are not reusable -- each HTTP request is a brand new auth
context = gssapi.SecurityContext(usage="accept")
in_token = base64.b64decode(in_token)
# step() throws exception on auth failure
out_token = context.step(in_token)
out_token = base64.b64encode(out_token)
out_hdr = b"WWW-Authenticate: " + mech + b" " + out_token
我没有专门针对 FastApi 的示例,所以我假设它是一个通用的 ASGI 应用程序 - 这是我在各种基于 WSGI 的项目中使用的 WSGI 中间件的一个主要工作的 ASGI 端口,仅作为示例如何使用
gssapi.Credentials()
:
# (c) 2021-2023 Mantas Mikulėnas <[email protected]>
# Released under the MIT license
import asyncio
import base64
import gssapi
class GSSAPIWrapper():
def __init__(self, app, *,
service_name=None,
keytab_path=None,
credentials=None):
if service_name and credentials:
raise ValueError("'service_name' and 'credentials' are mutually exclusive")
if keytab_path and credentials:
raise ValueError("'keytab_path' and 'credentials' are mutually exclusive")
if not credentials:
if not isinstance(service_name, gssapi.Name):
service_name = gssapi.Name(service_name or "HTTP@",
gssapi.NameType.hostbased_service)
if keytab_path:
credentials = gssapi.Credentials(name=service_name,
usage="accept",
store={"keytab": keytab_path})
else:
credentials = gssapi.Credentials(name=service_name,
usage="accept")
self.app = app
self.creds = credentials
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
return await self.app(scope, receive, send)
in_hdr = dict(scope["headers"]).get(b"authorization", b"")
if in_hdr.startswith(b"Negotiate "):
ctx = gssapi.SecurityContext(creds=self.creds, usage="accept")
in_token = base64.b64decode(in_hdr[10:])
out_token = ctx.step(in_token)
#out_token = await asyncio.to_thread(ctx.step, in_token)
if ctx.complete:
scope.setdefault("extensions", {})
scope["extensions"]["gss.initiator"] = ctx.initiator_name
scope["extensions"]["REMOTE_USER"] = str(ctx.initiator_name)
async def custom_send(event):
if out_token and event["type"] == "http.response.start":
out_hdr = base64.b64encode(out_token)
event.setdefault("headers", [])
event["headers"].append((b"WWW-Authenticate",
b"Negotiate " + out_hdr))
await send(event)
return await self.app(scope, receive, custom_send)
# No input header, or the context didn't complete in 1 step
await send({"type": "http.response.start",
"status": 401,
"headers": [(b"WWW-Authenticate", b"Negotiate")]})
await send({"type": "http.response.body",
"body": b""})
在大多数情况下,您不需要灵活性,只需在环境中通过
KRB5_KTNAME
指定密钥表路径,因此您可能希望将其移植到 spnego
模块以获得额外的可移植性(因为它可以在使用使用密钥表的 SSPI 和 Linux):
# (c) 2021-2024 Mantas Mikulėnas <[email protected]>
# Released under the MIT license
import asyncio
import base64
import spnego
class GSSAPIWrapper():
def __init__(self, app, *, hostname=None):
self.app = app
self.hostname = hostname
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
return await self.app(scope, receive, send)
in_hdr = dict(scope["headers"]).get(b"authorization", b"")
if in_hdr.startswith(b"Negotiate "):
ctx = spnego.server(hostname=self.hostname,
service="HTTP",
protocol="negotiate")
in_token = base64.b64decode(in_hdr[10:])
out_token = ctx.step(in_token)
#out_token = await asyncio.to_thread(ctx.step, in_token)
if ctx.complete:
scope.setdefault("extensions", {})
scope["extensions"]["REMOTE_USER"] = str(ctx.client_principal)
async def custom_send(event):
if out_token and event["type"] == "http.response.start":
out_hdr = base64.b64encode(out_token)
event.setdefault("headers", [])
event["headers"].append((b"WWW-Authenticate",
b"Negotiate " + out_hdr))
await send(event)
return await self.app(scope, receive, custom_send)
# No input header, or the context didn't complete in 1 step
await send({"type": "http.response.start",
"status": 401,
"headers": [(b"WWW-Authenticate", b"Negotiate")]})
await send({"type": "http.response.body",
"body": b""})