Python websockets keepalive ping 超时;没有收到关闭帧

问题描述 投票:0回答:2

我有 20-50 个用户,我想要了解他们是否已连接到互联网或互联网信号较弱的实时信息

我编写了一个Python脚本来检查连接并将信息发送到Django django-channels中的Web服务器 脚本从上午 9 点到下午 6 点在 Windows 调度程序中运行

脚本

    async def main():
        username = get_username()
        url = "{}{}/".format(LOG_SERVER, username)
        async with websockets.connect(url) as websocket:
            # send info to server
            while True:
                try:
                    loop = asyncio.get_event_loop()
                    data = await loop.run_in_executor(None, 
                                              lambda:get_data(username))
                    await websocket.send(json.dumps(data))
                    await asyncio.sleep(30)
                except websockets.ConnectionClosed as e:
                    print(f'Terminated', e)
                    continue
                except Exception as e:
                    logging.error(e)

if __name__ == "__main__":
    asyncio.run(main())

WebSockets 包:https://websockets.readthedocs.io/

每 30 秒发送一次信息 ping min、max、avg 并且保证客户端只要连接到服务器就已经连接了

Django 消费者

async def connect(self):
        try:
            self.username = self.scope['url_route']['kwargs']['username']
            await database_sync_to_async(self.update_user_incr)(self.username)
        except KeyError as e:
            pass
          ......
async def disconnect(self, close_code):
     
        try:
            if(self.username):
                await database_sync_to_async(self.update_user_decr)(self.username)
        except:
            pass
        .......

问题是 python 脚本偶尔会因消息而锁定

sent 1011 (unexpected error) keepalive ping timeout; no close frame received

no close frame received or sent

而且我无法自动回电

如何保持连接打开,或者如果连接关闭,它会在一小部分时间内重新打开,以便前端无法修改在线或离线指示器

python django websocket
2个回答
5
投票

我最终与这样的事情重新建立了联系

 async for websocket in websockets.connect(url):
        try:
            loop = asyncio.get_event_loop()
            data = await loop.run_in_executor(None, lambda: get_data(username))
            await websocket.send(json.dumps(data))
            await asyncio.sleep(30)
        except websockets.ConnectionClosed as e:
            print(f'Terminated', e)

但是我遇到了一个问题,那就是在每个连接上调整用户状态。这种情况下,你可以用这个blog和这个answer来减轻数据库的负载


0
投票

我专门使用了

websockets
库来连接到 Django Channels。无法让 websocket-client 工作。不管怎样,这个完整的解决方案似乎对我来说保持联系:

import threading
from threading import _start_new_thread, Lock
import time
import websockets
from var.hardcoded_consts import *
from core.global_var import load_globals
load_globals(__name__, 'var.hardcoded_consts')
import core.ws2dj_protocol as ws2dj
from core.exceptions import Ws2DjangoCommsCantConnect
import json
import asyncio
import websocket
import rel

class Ws2DjComms:   
    #_websocket = None
    _host = WS2DJ_HOST.value
    _port = WS2DJ_TO_DJANGO_PORT.value
    _queue = None
    _thread_id = None
    _running = False
    _queueLock = Lock()
    
    @staticmethod
    def server_url():
        return f'ws://{Ws2DjComms._host}:{Ws2DjComms._port}/ws/main2dj-comms/'    
             
    @staticmethod
    def report_status(msg, **kwargs):
        if not Ws2DjComms.ignore_status():                
            from core.status_reporter import StatusReporter as report
            report.status(msg, do_ws=False, **kwargs)
        
    @staticmethod
    def report_error(err, **kwargs):
        if not Ws2DjComms.ignore_status():                
            from core.status_reporter import StatusReporter as report
            report.error(err, do_ws=False, **kwargs)
                       
    @staticmethod
    def queue_protocol(protocol: ws2dj.Ws2DjProtocol):
        try:                
            Comms = Ws2DjComms
            Comms._queueLock.acquire()
    
            if Comms._queue is None:
                Comms._queue = []
                
            Comms._queue.append(protocol)
            
            Comms._queueLock.release()                        
                     
            if not Comms.is_running():
                Comms.start_running()

            Comms.report_status(f"Ws2DjProtocol: {protocol} has been queued for executing.")        
        except Exception as e:
            Comms.report_error(e)
               
    @staticmethod
    def is_running():
        return Ws2DjComms._running
            
    @staticmethod
    async def run_main_loop():
        Comms = Ws2DjComms        
        try:
            ws_url =  Ws2DjComms.server_url()
            
            async with websockets.connect(ws_url) as ws:
                Comms.report_status("Connected from Main-side in MainApp <-{WebSocket}-> DjangoSite.")
                 
                while Comms.is_running():
                    time.sleep(WS2DJ_LOOP_SLEEP_MS.value/1000.0)
                    
                    queue = tuple()
                    Comms._queueLock.acquire()
        
                    if Comms._queue:
                        queue = tuple(Comms._queue)
                        Comms._queue.clear()
                        
                    Comms._queueLock.release()
                    
                    for protocol in queue:
                        if not Comms.is_running():
                            break
                        await protocol(ws)
                        
                        response = await ws.recv()
                        print(f'response: {response}')                        
                    else:    
                        if len(queue):
                            Comms.report_status(f'Ws2DjComms main loop processed {len(queue)} interactions with Django-side.')
                            
                        continue
                    break                
                
        except Exception as e:
            Comms.report_error(e)
        
    @staticmethod
    def stop_running():
        Ws2DjComms._running = False
        
    @staticmethod
    def start_running():
        Ws2DjComms._thread_id = _start_new_thread(Ws2DjComms._start_running, args=())
        
    @staticmethod
    def _start_running():
        Ws2DjComms._running = True
        
        while Ws2DjComms.is_running():
            try:            
                asyncio.run(Ws2DjComms.run_main_loop())
            except Exception as e:
                Ws2DjComms.report_error(e, raise_=False)               

        
    @staticmethod
    def ignore_status():
        return not WS2DJ_SHOW_STATUS.value
        

它本质上是两个嵌套的 While 循环,每个循环检查我们是否正在运行。外面的那个会打电话给

asyncio.run(Ws2DjComms.run_main_loop()
,以防我得到旧的
no close frame received error
。我似乎运行稳定,如果/当它再次失败时,我会发回这里。

© www.soinside.com 2019 - 2024. All rights reserved.