`channel_layer.group_send`不会调用`AsyncWebsocketConsumer`中的方法

问题描述 投票:0回答:1

我使用 Django Channels 在 Django 应用程序中编写了一个 WebSocket 连接,并且我正在使用 Daphne 在本地环境中进行测试(我将使用

uvicorn
进行生产)

这里是一个函数,将在

save
模型的
UserNotification
方法中调用,通过用户 WebSocket 连接发送
title
message
通知。

from typing import Type

from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from django.conf import settings


def send_user_notification(
    user_id: int | Type[int], title: str, message: str
):
    channel_layer = get_channel_layer()

    channel_name = settings.NOTIFICATION_WEBSOCKET_CHANNEL_NAME.format(user_id=user_id)
    group_name = settings.NOTIFICATION_WEBSOCKET_GROUP_NAME.format(channel_name=channel_name)

    async_to_sync(channel_layer.group_send)(
        group_name,
        {
            "type": "user_notify",
            "message": {
                "title": title, "message": message
            },
        },
    )


class UserNotification(models.Model):
    user = models.ForeignKey("users.User", on_delete=models.CASCADE)
    notification = models.ForeignKey(
        to="notification.Notification", on_delete=models.CASCADE
    )

    def save(self, **kwargs):
        send_user_notification(
            user_id=self.user_id,
            title=self.notification.title,
            message=self.notification.message,
            message_type=self.notification.message_type,
        )

        super().save(**kwargs)

这是我的

AsyncWebsocketConsumer

import json
import logging

from channels.generic.websocket import AsyncWebsocketConsumer
from channels.layers import get_channel_layer
from django.conf import settings


class UserNotificationConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        if self.scope["user"].is_anonymous:
            await self.close()

        self.channel_name = settings.NOTIFICATION_WEBSOCKET_CHANNEL_NAME.format(
            user_id=self.scope["user"].pk
        )
        self.group_name = settings.NOTIFICATION_WEBSOCKET_GROUP_NAME.format(
            channel_name=self.channel_name
        )

        self.channel_layer = get_channel_layer()
        await self.channel_layer.group_add(self.group_name, self.channel_name)

        await self.accept()

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(self.group_name, self.channel_name)

    async def user_notify(self, event):
        print("Event: ", event)
        data = event["message"]
        await self.send(text_data=json.dumps(data))

这是

asgi.py
文件:

import os

from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application

from apps.notification.auth import JWTAuthMiddlewareStack
from apps.notification.consumers import UserNotificationConsumer

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "apply_link.settings")

application = ProtocolTypeRouter(
    {
        "http": get_asgi_application(),
        "websocket": JWTAuthMiddlewareStack(URLRouter(
            [
                path(f"{settings.WEBSOCKET_PREFIX}/v1/notifications/", UserNotificationConsumer.as_asgi()),
            ]
        )),
    }
)

这是身份验证中间件:

from urllib.parse import parse_qs

from channels.middleware import BaseMiddleware
from django.conf import settings
from django.contrib.auth.models import AnonymousUser
from rest_framework_simplejwt.exceptions import InvalidToken, TokenError
from rest_framework_simplejwt.tokens import AccessToken

from apps.users.models import User


class WebSocketJWTAuthMiddleware(BaseMiddleware):
    async def __call__(self, scope, receive, send):
        query_string = scope["query_string"].decode("utf8")
        query_dict = parse_qs(query_string)
        token = query_dict.get(settings.WEBSOCKET_AUTH_QUERY_PARAM)[0]

        try:
            # This will validate the token only
            token: AccessToken = AccessToken(token)

        except (InvalidToken, TokenError):
            return None

        else:
            try:
                scope["user"] = await User.objects.aget(id=token["user_id"])
            except User.DoesNotExist:
                scope["user"] = AnonymousUser()

        return await super().__call__(scope, receive, send)


def JWTAuthMiddlewareStack(inner):
    return WebSocketJWTAuthMiddleware(inner)

这是我的

settings.py

NOTIFICATION_WEBSOCKET_CHANNEL_NAME = "user_{user_id}_notifications"
NOTIFICATION_WEBSOCKET_GROUP_NAME = "group_{channel_name}"

WEBSOCKET_AUTH_QUERY_PARAM = "token"
WEBSOCKET_PREFIX = env.str("WEBSOCKET_PREFIX", "ws").strip("/ ")

REDIS_USER = env.str("REDIS_USER", "default")
REDIS_PASSWORD = env.str("REDIS_PASSWORD")
REDIS_HOST = env.str("DJANGO_REDIS_HOST")
REDIS_PORT_NUMBER = env.str("REDIS_PORT_NUMBER", "6379")

REDIS_CONNECTION_URI = (
    f"redis://{REDIS_USER}:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT_NUMBER}/0"
)

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [REDIS_CONNECTION_URI],
        },
    },
}
# Tested both memory and Redis as the backend
# CHANNEL_LAYERS = {
#     "default": {
#         "BACKEND": "channels.layers.InMemoryChannelLayer"
#     }
# }

这是我可以成功完成并且没有问题的事情:

  • 我可以在
    runserver
    上执行
    0.0.0.0:8000
    ,Daphne 将为该应用程序提供服务。
  • Redis 运行良好。我还尝试了内存作为后端,它正在工作。
  • 我可以使用
    Postman
    连接到 WebSocket,并在查询参数中使用有效的 JWT(
    scope["user"]
    是正确的用户)

问题在于:当我保存

UserNotification
模型时,它会调用
send_user_notification
,并且内存和Redis通道都会成功调用
channel_layer.group_send
。但是
UserNotificationConsumer.user_notify
方法不会被调用,并且不会通过打开且工作的 WebSocket 连接向用户发送任何数据。

django websocket django-channels
1个回答
0
投票

我找到了解决方案。

问题在于,在 Consumer

self.channel_name

 方法中重写 
connect
。我保留了它的默认值并在添加组时使用它。

所以 Consumer 类的最终工作版本将是这样的:

import json from channels.generic.websocket import AsyncWebsocketConsumer from channels.layers import get_channel_layer from django.conf import settings class UserNotificationConsumer(AsyncWebsocketConsumer): async def connect(self): if self.scope["user"].is_anonymous: await self.close() self.group_name = settings.NOTIFICATION_WEBSOCKET_GROUP_NAME.format( user_id=self.scope["user"].pk ) self.channel_layer = get_channel_layer() await self.channel_layer.group_add(self.group_name, self.channel_name) await self.accept() async def disconnect(self, close_code): await self.channel_layer.group_discard(self.group_name, self.channel_name) async def user_notify(self, event): data = event["message"] await self.send(text_data=json.dumps(data))
    
© www.soinside.com 2019 - 2024. All rights reserved.