现在,我有一个位于服务器上的API,该服务器还发出访问令牌。 该API是从Django Rest Framework创建的,并受到django-oauth-toolkit中的OAuth2TokenAuthentication的保护,虽然可以正常工作,但是对本地存储的令牌进行了身份验证。
class OAuth2Authentication(BaseAuthentication):
"""
OAuth 2 authentication backend using `django-oauth-toolkit`
"""
www_authenticate_realm = 'api'
def authenticate(self, request):
"""
Returns two-tuple of (user, token) if authentication succeeds, or None otherwise.
"""
oauthlib_core = get_oauthlib_core()
valid, r = oauthlib_core.verify_request(request, scopes=[])
if valid:
return r.user, r.access_token
else:
return None
def authenticate_header(self, request):
"""
Bearer is the only finalized type currently
"""
return 'Bearer realm="{}"'.format(self.www_authenticate_realm)
我想将服务器分为两台服务器,即身份验证服务器和资源服务器,以便承载API的服务不需要具有令牌存储机制。 从上面的代码中可以看到,r.access_token是AccessToken的模型实例。
我不确定更改API身份验证以远程检查AS服务器的最佳方法是什么(也许已经有一个书面软件包?)
我对令牌验证,如在互联网上搜索这一个 ,而它提供了一些想法,但似乎并没有对我的问题不够具体。
我设法解决了一个概念证明,就是在Authentication Server中创建一个API端点来验证令牌,并创建一个新的身份验证类,它所做的是将令牌验证转发给Authentication Server来验证令牌,响应身份验证服务器的令牌同时包含令牌和用户信息,因此资源服务器有机会在其本地数据库中创建用户以创建用户会话。
class TokenIntrospectSerializer(ModelSerializer):
user = UserSerializer()
class Meta:
model = AccessToken
class IntrospectView(APIView):
"""
An API view that introspect a given token
"""
serializer_class = TokenIntrospectSerializer
authentication_classes = []
permission_classes = []
def get(self, request, *args, **kwargs):
oauthlib_core = get_oauthlib_core()
valid, r = oauthlib_core.verify_request(request, scopes=[])
if not valid:
raise APIException('Invalid token')
return Response(TokenIntrospectSerializer(r.access_token).data)
RS侧的新身份验证类。
class OAuth2RSAuthentication(BaseAuthentication):
"""
Forwards token to Authentication Server to validate the token
"""
class TokenObject(object):
"""
Inner class code borrowed from AccessToken model
provides necessary methods for Permission class to use
"""
scope = ""
token = ""
def __init__(self, token, scope):
self.token = token
self.scope = scope
def is_valid(self, scopes=None):
return self.allow_scopes(scopes)
def allow_scopes(self, scopes):
if not scopes:
return True
provided_scopes = set(self.scope.split())
resource_scopes = set(scopes)
return resource_scopes.issubset(provided_scopes)
def authenticate(self, request):
# TODO Caching
oauthlib_core = get_oauthlib_core()
_, r = oauthlib_core.verify_request(request, scopes=[])
result = requests.get("{}/api/token".format(settings.GOCONNECT_BASE_URL), params={'access_token': r.access_token}).json()
if 'user' in result:
user_data = result['user']
user, created = User.objects.update_or_create(defaults=user_data, uuid=user_data['uuid'])
if user:
return user, self.TokenObject(result['token'], result['scope'])
return None