我需要对 Django REST 框架有更多经验的人提供帮助,告诉我我创建的登录和身份验证系统是否遵循正确的框架标准
我怀疑是否应该在我的代码的某些部分使用Django的login()函数,在客户端视图中我想避免对服务器的任何直接访问,所以我在微服务视图,在我的“UserLoginAPI”微服务视图中,我应该使用登录(请求,用户)函数来登录用户吗?我在一些讨论中看到我应该用 update_last_login(None, user) 替换 login() 这就是我所做的,但我不确定它是否正确
我还想知道我的身份验证中间件是否正确,既可以忽略公共视图,也可以检查令牌并更新会话中的用户
我没有很多经验,如果您能告诉我我的代码是否遵循正确的标准,或者是否有一些我可以改进的地方,我可以避免的某个问题,我将不胜感激。
我的登录流程是:
我的注销流程是:
在我的中间件中,我检查视图是否是公共的,如果是公共的,则中间件不会执行,因为它们是用户不需要登录即可访问的视图,如果是非公共的视图执行中间件并检查会话中是否存在令牌,它从数据库中获取与会话令牌相对应的令牌,如果不存在,则返回错误。如果令牌存在,它会检查它是否已过期,如果已过期,则返回错误,如果不存在,则将请求的用户更新为与令牌关联的用户。
“登录”和“注销”是我的客户端视图,它们从 HTML 表单接收 POST 请求并将此数据发送到微服务 API
def login(request):
if request.method == 'POST':
username = request.POST.get('username')
password = request.POST.get('password')
api_url = f'{BASE_URL}/UserLoginAPI/'
data = {
'username': username,
'password': password,
}
try:
response = api_request.make_post_request(api_url, data=data)
if response['status'] == status.HTTP_200_OK:
access_token = response.get('access_token')
request.session['access_token'] = access_token
return redirect('home')
elif response['status'] in [status.HTTP_400_BAD_REQUEST, status.HTTP_500_INTERNAL_SERVER_ERROR]:
messages.error(request, response['message'], extra_tags='error')
except requests.exceptions.RequestException:
messages.error(request, 'Erro ao fazer login', extra_tags='error')
return render(request, r'login\login.html')
def logout(request):
if request.method == 'POST':
username = request.user
api_url = f'{BASE_URL}/UserLogoutAPI/'
data = {
'username': username,
}
try:
response = api_request.make_post_request(api_url, data=data)
if response['status'] == status.HTTP_200_OK:
return redirect('login')
elif response['status'] in [status.HTTP_400_BAD_REQUEST, status.HTTP_500_INTERNAL_SERVER_ERROR]:
messages.error(request, response['message'], extra_tags='error')
except requests.exceptions.RequestException:
messages.error(request, 'Erro ao deslogar', extra_tags='error')
“UserLoginAPI”和“UserLogoutAPI”是我在身份验证微服务中的视图
@authentication_classes([OAuth2Authentication])
@permission_classes([TokenHasReadWriteScope])
class UserLoginAPI(APIView):
def post(self, request):
serializer = UserLoginSerializer(data=request.data, context={'request': request})
try:
serializer.is_valid(raise_exception=True)
user = serializer.validated_data['user']
access_token = serializer.validated_data['access_token']
update_last_login(None, user)
return Response({'success': 'Login bem sucedido', 'access_token': access_token}, status=status.HTTP_200_OK)
except ValidationError as e:
error_message = str(e.detail['non_field_errors'][0])
log_error(request.path, error_message, None)
return Response({'error': error_message}, status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
log_error(request.path, str(e), None)
return Response({'error': 'Erro ao fazer login'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@authentication_classes([OAuth2Authentication])
@permission_classes([TokenHasReadWriteScope])
class UserLogoutAPI(APIView):
def post(self, request):
serializer = UserLogoutSerializer(data=request.data)
try:
serializer.is_valid(raise_exception=True)
user = serializer.validated_data['user']
logout(request)
signals.user_logged_out.send(sender=User, request=request, user=user)
return Response({'success': 'Usuário deslogado com sucesso'}, status=status.HTTP_200_OK)
except ValidationError as e:
error_message = str(e.detail['non_field_errors'][0])
log_error(request.path, error_message, user)
return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
log_error(request.path, str(e), user)
return Response({'error': 'Erro ao deslogar'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
“UserLoginSerializer”和“UserLogoutSerializer”são os meus 序列化器
class UserLoginSerializer(serializers.Serializer):
username = serializers.CharField()
password = serializers.CharField()
def validate(self, data):
username = data.get('username')
password = data.get('password')
request = self.context.get('request')
if AxesProxyHandler().is_locked(request):
raise ValidationError('O usuário está bloqueado por múltiplas tentativas falhas de login, aguarde 30 minutos antes de tentar entrar novamente')
user = authenticate(request=request, username=username, password=password)
if user is None or not user.is_active:
raise ValidationError('Usuário ou senha inválidos')
existing_token = AccessToken.objects.filter(
user=user,
expires__gt=timezone.now()
).first()
if existing_token:
access_token = existing_token
else:
access_token = AccessToken.objects.create(
user=user,
token=generate_token(),
application=Application.objects.get(name='application_login'),
expires=timezone.now() + timedelta(minutes=60),
)
return {'user': user, 'access_token': access_token.token}
class UserLogoutSerializer(serializers.Serializer):
username = serializers.CharField()
def validate(self, data):
username = data.get('username')
user = User.objects.get(username=username)
if not user or not user.is_active:
raise ValidationError('Usuário inválido')
existing_token = AccessToken.objects.filter(
user = user,
expires__gt = timezone.now()
).first()
existing_token.expires = timezone.now()
existing_token.save()
return {'user': user}
“TokenAuthenticationMiddleware”是我的身份验证中间件
class TokenAuthenticationMiddleware:
def __init__(self, get_response):
self.get_response = get_response
self.public_views_paths = set(settings.PUBLIC_VIEWS)
def __call__(self, request):
request_path = request.path
if request_path not in self.public_views_paths:
session_token = request.session.get('access_token')
if session_token:
try:
access_token = AccessToken.objects.get(token=session_token)
if access_token.expires < timezone.now():
request.user = AnonymousUser()
messages.error(request, "Sua sessão expirou, faça o login novamente")
return redirect('login')
else:
request.user = access_token.user
log_path(request_path, request.user)
except AccessToken.DoesNotExist:
request.user = AnonymousUser()
messages.error(request, "Acesso negado, faça login para continuar")
return redirect('login')
response = self.get_response(request)
return response
带有公共网址的我的settings.py
PUBLIC_VIEWS = ['/', '/esqueci-a-senha/', '/redefinir-senha/']
帮助审查我的代码并评估其是否符合正确的开发标准
你可以试试这个,
尝试
view.py
from rest_framework.authtoken.models import Token
class LoginAPIView(APIView):
serializer_class = LoginSerializer
authentication_classes = [TokenAuthentication]
def post(self, request):
serializer = LoginSerializer(data = request.data)
if serializer.is_valid(raise_exception=True):
user = authenticate(username=serializer.data['username'], password=serializer.data['password'])
if user:
token, created = Token.objects.get_or_create(user=user)
return Response({'token': [token.key], "Sucsses":"Login SucssesFully"}, status=status.HTTP_201_CREATED )
return Response({'Massage': 'Invalid Username and Password'}, status=401)
尝试
serializers.py
class LoginSerializer(serializers.ModelSerializer):
class Meta:
model = CustomUser
fields = ("username", "password")
尝试
urls.py
from rest_framework.authtoken.views import obtain_auth_token
urlpatterns = [
path('auth/', obtain_auth_token, name='auth'),
]
尝试
settings.py
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.TokenAuthentication',
],
}
使用这个方法你的工作变得轻松