我使用 Django Channels 在 Django 应用程序中编写了一个 WebSocket 连接,并且我正在使用 Daphne 在本地环境中进行测试(我将使用
uvicorn
进行生产)
这里是一个函数,将在
save
模型的 UserNotification
方法中调用,通过用户 WebSocket 连接发送 title
和 message
通知。
from typing import Type
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from django.conf import settings
def send_user_notification(
user_id: int | Type[int], title: str, message: str
):
channel_layer = get_channel_layer()
channel_name = settings.NOTIFICATION_WEBSOCKET_CHANNEL_NAME.format(user_id=user_id)
group_name = settings.NOTIFICATION_WEBSOCKET_GROUP_NAME.format(channel_name=channel_name)
async_to_sync(channel_layer.group_send)(
group_name,
{
"type": "user_notify",
"message": {
"title": title, "message": message
},
},
)
class UserNotification(models.Model):
user = models.ForeignKey("users.User", on_delete=models.CASCADE)
notification = models.ForeignKey(
to="notification.Notification", on_delete=models.CASCADE
)
def save(self, **kwargs):
send_user_notification(
user_id=self.user_id,
title=self.notification.title,
message=self.notification.message,
message_type=self.notification.message_type,
)
super().save(**kwargs)
这是我的
AsyncWebsocketConsumer
:
import json
import logging
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.layers import get_channel_layer
from django.conf import settings
class UserNotificationConsumer(AsyncWebsocketConsumer):
async def connect(self):
if self.scope["user"].is_anonymous:
await self.close()
self.channel_name = settings.NOTIFICATION_WEBSOCKET_CHANNEL_NAME.format(
user_id=self.scope["user"].pk
)
self.group_name = settings.NOTIFICATION_WEBSOCKET_GROUP_NAME.format(
channel_name=self.channel_name
)
self.channel_layer = get_channel_layer()
await self.channel_layer.group_add(self.group_name, self.channel_name)
await self.accept()
async def disconnect(self, close_code):
await self.channel_layer.group_discard(self.group_name, self.channel_name)
async def user_notify(self, event):
print("Event: ", event)
data = event["message"]
await self.send(text_data=json.dumps(data))
这是
asgi.py
文件:
import os
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
from apps.notification.auth import JWTAuthMiddlewareStack
from apps.notification.consumers import UserNotificationConsumer
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "apply_link.settings")
application = ProtocolTypeRouter(
{
"http": get_asgi_application(),
"websocket": JWTAuthMiddlewareStack(URLRouter(
[
path(f"{settings.WEBSOCKET_PREFIX}/v1/notifications/", UserNotificationConsumer.as_asgi()),
]
)),
}
)
这是身份验证中间件:
from urllib.parse import parse_qs
from channels.middleware import BaseMiddleware
from django.conf import settings
from django.contrib.auth.models import AnonymousUser
from rest_framework_simplejwt.exceptions import InvalidToken, TokenError
from rest_framework_simplejwt.tokens import AccessToken
from apps.users.models import User
class WebSocketJWTAuthMiddleware(BaseMiddleware):
async def __call__(self, scope, receive, send):
query_string = scope["query_string"].decode("utf8")
query_dict = parse_qs(query_string)
token = query_dict.get(settings.WEBSOCKET_AUTH_QUERY_PARAM)[0]
try:
# This will validate the token only
token: AccessToken = AccessToken(token)
except (InvalidToken, TokenError):
return None
else:
try:
scope["user"] = await User.objects.aget(id=token["user_id"])
except User.DoesNotExist:
scope["user"] = AnonymousUser()
return await super().__call__(scope, receive, send)
def JWTAuthMiddlewareStack(inner):
return WebSocketJWTAuthMiddleware(inner)
这是我的
settings.py
:
NOTIFICATION_WEBSOCKET_CHANNEL_NAME = "user_{user_id}_notifications"
NOTIFICATION_WEBSOCKET_GROUP_NAME = "group_{channel_name}"
WEBSOCKET_AUTH_QUERY_PARAM = "token"
WEBSOCKET_PREFIX = env.str("WEBSOCKET_PREFIX", "ws").strip("/ ")
REDIS_USER = env.str("REDIS_USER", "default")
REDIS_PASSWORD = env.str("REDIS_PASSWORD")
REDIS_HOST = env.str("DJANGO_REDIS_HOST")
REDIS_PORT_NUMBER = env.str("REDIS_PORT_NUMBER", "6379")
REDIS_CONNECTION_URI = (
f"redis://{REDIS_USER}:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT_NUMBER}/0"
)
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [REDIS_CONNECTION_URI],
},
},
}
# Tested both memory and Redis as the backend
# CHANNEL_LAYERS = {
# "default": {
# "BACKEND": "channels.layers.InMemoryChannelLayer"
# }
# }
这是我可以成功完成并且没有问题的事情:
runserver
上执行 0.0.0.0:8000
,Daphne 将为该应用程序提供服务。Postman
连接到 WebSocket,并在查询参数中使用有效的 JWT(scope["user"]
是正确的用户)问题在于:当我保存
UserNotification
模型时,它会调用send_user_notification
,并且内存和Redis通道都会成功调用channel_layer.group_send
。但是 UserNotificationConsumer.user_notify
方法不会被调用,并且不会通过打开且工作的 WebSocket 连接向用户发送任何数据。
我找到了解决方案。
问题在于,在 Consumerself.channel_name
方法中重写
connect
。我保留了它的默认值并在添加组时使用它。所以 Consumer 类的最终工作版本将是这样的:
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.layers import get_channel_layer
from django.conf import settings
class UserNotificationConsumer(AsyncWebsocketConsumer):
async def connect(self):
if self.scope["user"].is_anonymous:
await self.close()
self.group_name = settings.NOTIFICATION_WEBSOCKET_GROUP_NAME.format(
user_id=self.scope["user"].pk
)
self.channel_layer = get_channel_layer()
await self.channel_layer.group_add(self.group_name, self.channel_name)
await self.accept()
async def disconnect(self, close_code):
await self.channel_layer.group_discard(self.group_name, self.channel_name)
async def user_notify(self, event):
data = event["message"]
await self.send(text_data=json.dumps(data))