Django 4.1.4
左塞尔 2.1.0
频道 4.0.0
我已遵循记录的建议,创建自定义中间件以在使用通道时对用户进行身份验证,并且我成功获取了用户 并检查用户是否经过身份验证,尽管我在连接到 websocket 时在查询字符串中发送用户 ID 来执行此操作。用户不会自动在 websocket 范围内可用。
我不确定是否存在任何潜在的安全风险,因为文档提到他们的建议不安全,我确实检查了 user.is_authenticated。所以我相信我已经确保了它。
我确实相信使用 djoser 创建的令牌会更好,尽管我不确定如何使用 websocket 请求发送标头,除非我在查询字符串中包含令牌而不是用户的 ID。
我很想知道最佳实践是什么。
我通过前端的查询字符串将用户 ID 传递到 websocket,如下所示:
websocket.value = new WebSocket(`ws://127.0.0.1:8000/ws/marketwatch/? ${authStore.userId}`)
中间件.py
from channels.db import database_sync_to_async
from django.contrib.auth.models import AnonymousUser
from django.contrib.auth import get_user_model
from django.core.exceptions import ObjectDoesNotExist
@database_sync_to_async
def get_user(user_id):
User = get_user_model()
try:
user = User.objects.get(id=user_id)
except ObjectDoesNotExist:
return AnonymousUser()
else:
if user.is_authenticated:
return user
else:
return AnonymousUser()
class QueryAuthMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
scope['user'] = await get_user(int(scope["query_string"].decode()))
return await self.app(scope, receive, send)
消费者.py
import os
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
from channels.security.websocket import AllowedHostsOriginValidator
from api.middleware import QueryAuthMiddleware
from .routing import ws_urlpatterns
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'api.settings')
application = ProtocolTypeRouter({
'http':get_asgi_application(),
'websocket': AllowedHostsOriginValidator(
QueryAuthMiddleware(
URLRouter(ws_urlpatterns)
)
)
})
在进行了一些广泛的研究之后,我决定不通过查询字符串传递 id 或令牌,因为由于这些数据存储在服务器日志中,这会带来风险。
IMO 风险最小的最佳选择是在建立连接后将令牌作为消息传递到 websocket,然后验证令牌;如果无效则关闭 websocket。
这意味着不需要之前实现的中间件。在这个特定的项目中,不会从客户端收到其他消息,因此我不需要对收到的消息的密钥进行任何检查。对于聊天应用程序和其他将从客户端接收更多消息的应用程序,可以更改此设置。
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
import json
from rest_framework.authtoken.models import Token
class MarketWatchConsumer(AsyncWebsocketConsumer):
@database_sync_to_async
def verify_token(self, token_dict):
try:
token = Token.objects.get(key=token_dict['token'])
except Token.DoesNotExist:
return False
else:
if token.user.is_active:
return True
else:
return False
async def connect(self):
await self.channel_layer.group_add('group', self.channel_name)
await self.accept()
async def receive(self, text_data=None, bytes_data=None):
valid_token = await self.verify_token(json.loads(text_data))
if not valid_token:
await self.close()
async def disconnect(self, code):
await self.channel_layer.group_discard('group', self.channel_name)
所以你的代码在这里应该更好一点
@database_sync_to_async
def verify_token(self, token_dict):
try:
token = Token.objects.get(key=token_dict['token'])
except Token.DoesNotExist:
return False
else:
if token.user.is_active:
return True
else:
return False
==>
@database_sync_to_async
def verify_token(self, token_dict):
try:
token = Token.objects.get(key=token_dict['token'])
except Token.DoesNotExist:
return False
else:
return token.user.is_active