在通过 Django Channels 连接到 Websockets 的同时持续使用来自 Redis Streams 的消息的有效方法

问题描述 投票:0回答:0

我编写了一个 Django 应用程序来接收长时间运行任务的更新。我将 Redis 用作网络套接字的通道层,并使用 Redis Streams 来存储当前正在运行的任务的更新。我从 WebSocket Consumer 的 connect 方法生成了一个线程来运行一个无限循环,以不断检查来自 Redis Consumer 的更新。对于我想要实现的目标,这似乎是一个骇人听闻的解决方案,即保持与 websocket 的连接并同时从 Redis Streams 接收更新。

这是我的 Websocket Consumer 代码。如您所见,我正在运行一个无限循环,以通过 Redis Consumer 检查流中的任何新消息。

import json
from threading import Thread

from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer
from django.utils import timezone
from redis import ConnectionPool, Redis
from redis_streams.consumer import Consumer


class ProgressStateConsumer(WebsocketConsumer):
    def __init__(self):
        self.task_id = self.task_group_name = None
        connection_pool = ConnectionPool(
            host="localhost",
            port=6379,
            db=0,
            decode_responses=True,
        )
        self.redis_connection = Redis(connection_pool=connection_pool)
        self.messages = []
        super().__init__()

    def connect(self):
        task_id = self.scope["url_route"]["kwargs"]["task_id"]
        self.task_id = task_id
        self.stream = f"task:{self.task_id}"
        self.task_group_name = (
            f"task-{self.task_id}-{round(timezone.now().timestamp())}"
        )
        async_to_sync(self.channel_layer.group_add)(
            self.task_group_name, self.channel_name
        )
        thread = Thread(target=self.send_status)
        thread.start()
        self.accept()

    def disconnect(self, close_code):
        async_to_sync(self.channel_layer.group_discard)(
            self.task_group_name, self.channel_name
        )

    def receive(self, text_data):
        pass

    def send_status(self):
        consumer = Consumer(
            redis_conn=self.redis_connection,
            stream=self.stream,
            consumer_group=self.task_group_name,
            batch_size=100,
            max_wait_time_ms=500,
        )
        while True:
            items = consumer.get_items()
            for message in items:
                self.messages.append(message.content["message"])
                consumer.remove_item_from_stream(item_id=message.msgid)
            if len(items):
                async_to_sync(self.channel_layer.group_send)(
                    self.task_group_name,
                    {
                        "type": "task_message",
                        "message": self.messages,
                    },
                )

    def task_message(self, event):
        self.send(text_data=json.dumps({"message": event["message"]}))

这似乎解决了我的问题,但我真的需要知道是否有更好的解决方案来实现这一点,以及如果从消费者的连接方法中的线程运行无限循环是我唯一能做到的方法?

django websocket redis django-channels redis-streams
© www.soinside.com 2019 - 2024. All rights reserved.