使用 Django REST 框架登录和身份验证

问题描述 投票:0回答:1

我需要对 Django REST 框架有更多经验的人提供帮助,告诉我我创建的登录和身份验证系统是否遵循正确的框架标准

我怀疑是否应该在我的代码的某些部分使用Django的login()函数,在客户端视图中我想避免对服务器的任何直接访问,所以我在微服务视图,在我的“UserLoginAPI”微服务视图中,我应该使用登录(请求,用户)函数来登录用户吗?我在一些讨论中看到我应该用 update_last_login(None, user) 替换 login() 这就是我所做的,但我不确定它是否正确

我还想知道我的身份验证中间件是否正确,既可以忽略公共视图,也可以检查令牌并更新会话中的用户

我没有很多经验,如果您能告诉我我的代码是否遵循正确的标准,或者是否有一些我可以改进的地方,我可以避免的某个问题,我将不胜感激。

我的登录流程是:

  1. HTML表单通过POST请求,将数据“用户名”和“密码”发送到“登录”客户端视图
  2. “Login”客户端视图接收此数据并将其发送到“UserLoginAPI”微服务 API
  3. “UserLoginAPI”微服务视图接收此数据并将其发送到“UserLoginSerializer”序列化器进行验证
  4. “UserLoginSerializer”序列化器执行必要的验证,例如:检查用户是否被阻止,使用 Django 的authenticate() 函数进行身份验证,检查用户是否不是“None”,如果用户处于活动状态,则使用 oauth2 创建令牌。 0 该用户并将经过身份验证的用户和令牌返回到“UserLoginAPI”视图
  5. “UserLoginAPI”视图接收用户和令牌,更新用户上次登录并将令牌发送到“Login”客户端视图
  6. “登录”客户端视图接收令牌并将令牌保存在会话中并将用户重定向到主页

我的注销流程是:

  1. HTML 表单通过 POST 请求,将数据“用户名”发送到“注销”客户端视图
  2. “Logout”客户端视图接收此数据并将其发送到“UserLogoutAPI”微服务 API
  3. “UserLogoutAPI”微服务视图接收此数据并将其发送到“UserLogoutSerializer”序列化器进行验证
  4. “UserLogoutSerializer”验证用户,获取活动令牌并使该令牌过期,并将用户返回到“UserLogoutAPI”视图
  5. “UserLogoutAPI”视图接收用户并使用 django 的 logout() 函数注销,并向 django 轴发送信号,以便它知道用户已注销
  6. “注销”客户端视图将用户重定向到登录页面

在我的中间件中,我检查视图是否是公共的,如果是公共的,则中间件不会执行,因为它们是用户不需要登录即可访问的视图,如果是非公共的视图执行中间件并检查会话中是否存在令牌,它从数据库中获取与会话令牌相对应的令牌,如果不存在,则返回错误。如果令牌存在,它会检查它是否已过期,如果已过期,则返回错误,如果不存在,则将请求的用户更新为与令牌关联的用户。

“登录”和“注销”是我的客户端视图,它们从 HTML 表单接收 POST 请求并将此数据发送到微服务 API

def login(request):

    if request.method == 'POST':
        username = request.POST.get('username')
        password = request.POST.get('password')

        api_url = f'{BASE_URL}/UserLoginAPI/'  
        data = {
            'username': username,
            'password': password,
        }

        try:

            response = api_request.make_post_request(api_url, data=data)

            if response['status'] == status.HTTP_200_OK:
                access_token = response.get('access_token')
                request.session['access_token'] = access_token
                return redirect('home')
            elif response['status'] in [status.HTTP_400_BAD_REQUEST, status.HTTP_500_INTERNAL_SERVER_ERROR]:
                messages.error(request, response['message'], extra_tags='error')   

        except requests.exceptions.RequestException:
            messages.error(request, 'Erro ao fazer login', extra_tags='error')

    return render(request, r'login\login.html')

def logout(request):

    if request.method == 'POST':
        username = request.user

        api_url = f'{BASE_URL}/UserLogoutAPI/'  
        data = {
            'username': username,
        }

        try:

            response = api_request.make_post_request(api_url, data=data)

            if response['status'] == status.HTTP_200_OK:
                return redirect('login')
            elif response['status'] in [status.HTTP_400_BAD_REQUEST, status.HTTP_500_INTERNAL_SERVER_ERROR]:
                messages.error(request, response['message'], extra_tags='error')

        except requests.exceptions.RequestException:
            messages.error(request, 'Erro ao deslogar', extra_tags='error')

“UserLoginAPI”和“UserLogoutAPI”是我在身份验证微服务中的视图

@authentication_classes([OAuth2Authentication])
@permission_classes([TokenHasReadWriteScope])
class UserLoginAPI(APIView):
    def post(self, request):
        
        serializer = UserLoginSerializer(data=request.data, context={'request': request})

        try:

            serializer.is_valid(raise_exception=True)
            user = serializer.validated_data['user']
            access_token = serializer.validated_data['access_token']

            update_last_login(None, user)
            return Response({'success': 'Login bem sucedido', 'access_token': access_token}, status=status.HTTP_200_OK)

        except ValidationError as e:
            error_message = str(e.detail['non_field_errors'][0])
            log_error(request.path, error_message, None)
            return Response({'error': error_message}, status=status.HTTP_400_BAD_REQUEST)
        except Exception as e:
            log_error(request.path, str(e), None)
            return Response({'error': 'Erro ao fazer login'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
        
@authentication_classes([OAuth2Authentication])
@permission_classes([TokenHasReadWriteScope])     
class UserLogoutAPI(APIView):
    def post(self, request):

        serializer = UserLogoutSerializer(data=request.data)

        try:

            serializer.is_valid(raise_exception=True)
            user = serializer.validated_data['user']

            logout(request)
            signals.user_logged_out.send(sender=User, request=request, user=user)
            return Response({'success': 'Usuário deslogado com sucesso'}, status=status.HTTP_200_OK)
        
        except ValidationError as e:
            error_message = str(e.detail['non_field_errors'][0])
            log_error(request.path, error_message, user)
            return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)
        except Exception as e:
            log_error(request.path, str(e), user)
            return Response({'error': 'Erro ao deslogar'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)

“UserLoginSerializer”和“UserLogoutSerializer”são os meus 序列化器

class UserLoginSerializer(serializers.Serializer):
    username = serializers.CharField()
    password = serializers.CharField()

    def validate(self, data):
        username = data.get('username')
        password = data.get('password')
        request = self.context.get('request')
      
        if AxesProxyHandler().is_locked(request):
            raise ValidationError('O usuário está bloqueado por múltiplas tentativas falhas de login, aguarde 30 minutos antes de tentar entrar novamente')

        user = authenticate(request=request, username=username, password=password)

        if user is None or not user.is_active:
            raise ValidationError('Usuário ou senha inválidos')

        existing_token = AccessToken.objects.filter(
                user=user,
                expires__gt=timezone.now()
        ).first()

        if existing_token:
            access_token = existing_token
        else:
            access_token = AccessToken.objects.create(
                user=user,
                token=generate_token(),
                application=Application.objects.get(name='application_login'),
                expires=timezone.now() + timedelta(minutes=60),
            )
        
        return {'user': user, 'access_token': access_token.token}  

class UserLogoutSerializer(serializers.Serializer):
    username = serializers.CharField()

    def validate(self, data):
        username = data.get('username')

        user = User.objects.get(username=username)

        if not user or not user.is_active:
            raise ValidationError('Usuário inválido')
        
        existing_token = AccessToken.objects.filter(
                user = user,
                expires__gt = timezone.now()
            ).first()

        existing_token.expires = timezone.now()  
        existing_token.save()
        
        return {'user': user}

“TokenAuthenticationMiddleware”是我的身份验证中间件

class TokenAuthenticationMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response
        self.public_views_paths = set(settings.PUBLIC_VIEWS)

    def __call__(self, request):
        request_path = request.path

        if request_path not in self.public_views_paths:
            session_token = request.session.get('access_token')

            if session_token:
                try:
                    access_token = AccessToken.objects.get(token=session_token)
        
                    if access_token.expires < timezone.now():
                        request.user = AnonymousUser()
                        messages.error(request, "Sua sessão expirou, faça o login novamente")
                        return redirect('login')
                    else:
                        request.user = access_token.user
                        log_path(request_path, request.user)
                except AccessToken.DoesNotExist:
                    request.user = AnonymousUser()
                    messages.error(request, "Acesso negado, faça login para continuar")
                    return redirect('login')
  
        response = self.get_response(request)
        return response

带有公共网址的我的settings.py

PUBLIC_VIEWS = ['/', '/esqueci-a-senha/', '/redefinir-senha/']

帮助审查我的代码并评估其是否符合正确的开发标准

python django django-rest-framework
1个回答
0
投票

你可以试试这个,

尝试

view.py

from rest_framework.authtoken.models import Token

    class LoginAPIView(APIView):
        serializer_class = LoginSerializer
        authentication_classes = [TokenAuthentication]
    
        def post(self, request):
            serializer = LoginSerializer(data = request.data)
            if serializer.is_valid(raise_exception=True):
                user = authenticate(username=serializer.data['username'], password=serializer.data['password'])
                if user:
                    token, created = Token.objects.get_or_create(user=user)
                    return Response({'token': [token.key], "Sucsses":"Login SucssesFully"}, status=status.HTTP_201_CREATED )
                return Response({'Massage': 'Invalid Username and Password'}, status=401)

尝试

serializers.py

class LoginSerializer(serializers.ModelSerializer):
    class Meta:
        model = CustomUser
        fields = ("username", "password")

尝试

urls.py

from rest_framework.authtoken.views import obtain_auth_token

urlpatterns = [
    path('auth/', obtain_auth_token, name='auth'),
]

尝试

settings.py

REST_FRAMEWORK = {
    'DEFAULT_AUTHENTICATION_CLASSES': [
        'rest_framework.authentication.TokenAuthentication',
    ],
}

使用这个方法你的工作变得轻松

© www.soinside.com 2019 - 2024. All rights reserved.