试图用Django通道来围绕消息流包头

问题描述 投票:1回答:1

我正试图绕过Django频道。我是异步编程的新手,我试图理解为什么我的代码会像这样。

我目前正在使用Django通道构建应用,目前正在settings.py中使用内存通道层:

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels.layers.InMemoryChannelLayer"
    }
}

我正在尝试通过网络套接字启动长期运行的任务,并希望使用者将定期更新发送给客户端。

示例代码:

import time
from asgiref.sync import async_to_sync
from channels.generic.websocket import JsonWebsocketConsumer

class Consumer(JsonWebsocketConsumer):

    def connect(self):
        print("connected to consumer")
        async_to_sync(self.channel_layer.group_add)(
            f'consumer_group',
            self.channel_name
        )
        self.accept()

    def disconnect(self, close_code):
        async_to_sync(self.channel_layer.group_discard)(
            'consumer_group',
            self.channel_name
        )
        self.close()

    def long_running_thing(self, event):

        for i in range(5):
            time.sleep(0.2)
            async_to_sync(self.channel_layer.group_send)(
                'consumer_group',
                {
                    "type": "log.progress",
                    "data": i
                }
            )
            print("long_running_thing", i)

    def log_progress(self, event):
        print("log_progress", event['data'])

    def receive_json(self, content, **kwargs):
        print(f"Received event: {content}")
        if content['action'] == "start_long_running_thing":
            async_to_sync(self.channel_layer.group_send)(
                'consumer_group',
                {
                    "type": "long.running.thing",
                    "data": content['data']
                }
            )

消费者一旦收到正确的操作,便会启动long_running_thing。但是,对log_progress的调用发生在之后 long_running_thing完成。

输出:

Received event: {'action': 'start_long_running_thing', 'data': {}}
long_running_thing 0
long_running_thing 1
long_running_thing 2
long_running_thing 3
long_running_thing 4
log_progress 0
log_progress 1
log_progress 2
log_progress 3
log_progress 4

有人可以向我解释为什么会这样以及如何记录进度的正确性吗?

编辑:添加了routing.py和JavaScript部分。

from django.urls import re_path

from sockets import consumers

websocket_urlpatterns = [
    re_path(r'$', consumers.Consumer),
]

我目前正在将vue.js与vue-native-websocket一起使用,这是前端上的相关部分。

const actions = {
  startLongRunningThing(context){
    const message = {
      action: "start_long_running_thing",
      data: {}
    }
    Vue.prototype.$socket.send(JSON.stringify(message))
}
django python-asyncio django-channels
1个回答
0
投票

我也从异步编程开始,但是我建议您改用AsyncJsonWebsocketConsumer,并在通过channel_layer发送事件之后,使用send_json函数:

import asyncio
import json
from channels.generic.websocket import AsyncJsonWebsocketConsumer


class Consumer(AsyncJsonWebsocketConsumer):

    async def connect(self):
        print("connected to consumer")
        await self.channel_layer.group_add(
            f'consumer_group',
            self.channel_name
        )
        await self.accept()

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(
            'consumer_group',
            self.channel_name
        )
        await self.close()

    async def task(self, i):
        await asyncio.sleep(i)
        await self.send_json({
            'log_progress': i
        })

    async def long_running_thing(self, event):
        for i in range(5):
            print("long_running_thing: ", i)
            loop = asyncio.get_event_loop()
            task = loop.create_task(self.task(i))
            await task
            print("log_progress: ", i)

    async def receive_json(self, content, **kwargs):
        print(f"Received event: {content}")
        if content['action'] == "start_long_running_thing":
            await self.channel_layer.group_send(
                'consumer_group',
                {
                    "type": "long.running.thing",
                    "data": content['data']
                }
            )
© www.soinside.com 2019 - 2024. All rights reserved.