我有 20-50 个用户,我想要了解他们是否已连接到互联网或互联网信号较弱的实时信息
我编写了一个Python脚本来检查连接并将信息发送到Django django-channels中的Web服务器 脚本从上午 9 点到下午 6 点在 Windows 调度程序中运行
async def main():
username = get_username()
url = "{}{}/".format(LOG_SERVER, username)
async with websockets.connect(url) as websocket:
# send info to server
while True:
try:
loop = asyncio.get_event_loop()
data = await loop.run_in_executor(None,
lambda:get_data(username))
await websocket.send(json.dumps(data))
await asyncio.sleep(30)
except websockets.ConnectionClosed as e:
print(f'Terminated', e)
continue
except Exception as e:
logging.error(e)
if __name__ == "__main__":
asyncio.run(main())
WebSockets 包:https://websockets.readthedocs.io/
每 30 秒发送一次信息 ping min、max、avg 并且保证客户端只要连接到服务器就已经连接了
async def connect(self):
try:
self.username = self.scope['url_route']['kwargs']['username']
await database_sync_to_async(self.update_user_incr)(self.username)
except KeyError as e:
pass
......
async def disconnect(self, close_code):
try:
if(self.username):
await database_sync_to_async(self.update_user_decr)(self.username)
except:
pass
.......
问题是 python 脚本偶尔会因消息而锁定
sent 1011 (unexpected error) keepalive ping timeout; no close frame received
no close frame received or sent
而且我无法自动回电
如何保持连接打开,或者如果连接关闭,它会在一小部分时间内重新打开,以便前端无法修改在线或离线指示器
我最终与这样的事情重新建立了联系
async for websocket in websockets.connect(url):
try:
loop = asyncio.get_event_loop()
data = await loop.run_in_executor(None, lambda: get_data(username))
await websocket.send(json.dumps(data))
await asyncio.sleep(30)
except websockets.ConnectionClosed as e:
print(f'Terminated', e)
但是我遇到了一个问题,那就是在每个连接上调整用户状态。这种情况下,你可以用这个blog和这个answer来减轻数据库的负载
我专门使用了
websockets
库来连接到 Django Channels。无法让 websocket-client 工作。不管怎样,这个完整的解决方案似乎对我来说保持联系:
import threading
from threading import _start_new_thread, Lock
import time
import websockets
from var.hardcoded_consts import *
from core.global_var import load_globals
load_globals(__name__, 'var.hardcoded_consts')
import core.ws2dj_protocol as ws2dj
from core.exceptions import Ws2DjangoCommsCantConnect
import json
import asyncio
import websocket
import rel
class Ws2DjComms:
#_websocket = None
_host = WS2DJ_HOST.value
_port = WS2DJ_TO_DJANGO_PORT.value
_queue = None
_thread_id = None
_running = False
_queueLock = Lock()
@staticmethod
def server_url():
return f'ws://{Ws2DjComms._host}:{Ws2DjComms._port}/ws/main2dj-comms/'
@staticmethod
def report_status(msg, **kwargs):
if not Ws2DjComms.ignore_status():
from core.status_reporter import StatusReporter as report
report.status(msg, do_ws=False, **kwargs)
@staticmethod
def report_error(err, **kwargs):
if not Ws2DjComms.ignore_status():
from core.status_reporter import StatusReporter as report
report.error(err, do_ws=False, **kwargs)
@staticmethod
def queue_protocol(protocol: ws2dj.Ws2DjProtocol):
try:
Comms = Ws2DjComms
Comms._queueLock.acquire()
if Comms._queue is None:
Comms._queue = []
Comms._queue.append(protocol)
Comms._queueLock.release()
if not Comms.is_running():
Comms.start_running()
Comms.report_status(f"Ws2DjProtocol: {protocol} has been queued for executing.")
except Exception as e:
Comms.report_error(e)
@staticmethod
def is_running():
return Ws2DjComms._running
@staticmethod
async def run_main_loop():
Comms = Ws2DjComms
try:
ws_url = Ws2DjComms.server_url()
async with websockets.connect(ws_url) as ws:
Comms.report_status("Connected from Main-side in MainApp <-{WebSocket}-> DjangoSite.")
while Comms.is_running():
time.sleep(WS2DJ_LOOP_SLEEP_MS.value/1000.0)
queue = tuple()
Comms._queueLock.acquire()
if Comms._queue:
queue = tuple(Comms._queue)
Comms._queue.clear()
Comms._queueLock.release()
for protocol in queue:
if not Comms.is_running():
break
await protocol(ws)
response = await ws.recv()
print(f'response: {response}')
else:
if len(queue):
Comms.report_status(f'Ws2DjComms main loop processed {len(queue)} interactions with Django-side.')
continue
break
except Exception as e:
Comms.report_error(e)
@staticmethod
def stop_running():
Ws2DjComms._running = False
@staticmethod
def start_running():
Ws2DjComms._thread_id = _start_new_thread(Ws2DjComms._start_running, args=())
@staticmethod
def _start_running():
Ws2DjComms._running = True
while Ws2DjComms.is_running():
try:
asyncio.run(Ws2DjComms.run_main_loop())
except Exception as e:
Ws2DjComms.report_error(e, raise_=False)
@staticmethod
def ignore_status():
return not WS2DJ_SHOW_STATUS.value
它本质上是两个嵌套的 While 循环,每个循环检查我们是否正在运行。外面的那个会打电话给
asyncio.run(Ws2DjComms.run_main_loop()
,以防我得到旧的 no close frame received error
。我似乎运行稳定,如果/当它再次失败时,我会发回这里。