我编写了一个 Django 应用程序来接收长时间运行任务的更新。我将 Redis 用作网络套接字的通道层,并使用 Redis Streams 来存储当前正在运行的任务的更新。我从 WebSocket Consumer 的 connect 方法生成了一个线程来运行一个无限循环,以不断检查来自 Redis Consumer 的更新。对于我想要实现的目标,这似乎是一个骇人听闻的解决方案,即保持与 websocket 的连接并同时从 Redis Streams 接收更新。
这是我的 Websocket Consumer 代码。如您所见,我正在运行一个无限循环,以通过 Redis Consumer 检查流中的任何新消息。
import json
from threading import Thread
from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer
from django.utils import timezone
from redis import ConnectionPool, Redis
from redis_streams.consumer import Consumer
class ProgressStateConsumer(WebsocketConsumer):
def __init__(self):
self.task_id = self.task_group_name = None
connection_pool = ConnectionPool(
host="localhost",
port=6379,
db=0,
decode_responses=True,
)
self.redis_connection = Redis(connection_pool=connection_pool)
self.messages = []
super().__init__()
def connect(self):
task_id = self.scope["url_route"]["kwargs"]["task_id"]
self.task_id = task_id
self.stream = f"task:{self.task_id}"
self.task_group_name = (
f"task-{self.task_id}-{round(timezone.now().timestamp())}"
)
async_to_sync(self.channel_layer.group_add)(
self.task_group_name, self.channel_name
)
thread = Thread(target=self.send_status)
thread.start()
self.accept()
def disconnect(self, close_code):
async_to_sync(self.channel_layer.group_discard)(
self.task_group_name, self.channel_name
)
def receive(self, text_data):
pass
def send_status(self):
consumer = Consumer(
redis_conn=self.redis_connection,
stream=self.stream,
consumer_group=self.task_group_name,
batch_size=100,
max_wait_time_ms=500,
)
while True:
items = consumer.get_items()
for message in items:
self.messages.append(message.content["message"])
consumer.remove_item_from_stream(item_id=message.msgid)
if len(items):
async_to_sync(self.channel_layer.group_send)(
self.task_group_name,
{
"type": "task_message",
"message": self.messages,
},
)
def task_message(self, event):
self.send(text_data=json.dumps({"message": event["message"]}))
这似乎解决了我的问题,但我真的需要知道是否有更好的解决方案来实现这一点,以及如果从消费者的连接方法中的线程运行无限循环是我唯一能做到的方法?