我们有以下逻辑来实现 WebSocket 通知命名空间。我删除了某些不重要的功能,例如 on_connect,因为该逻辑有效:
class ClientNotifications(Namespace):
def __init__(self, app, socketio, *args, **kwargs):
super().__init__(*args, **kwargs)
self.app = app
self.event_bus: EventBus = EventBus()
self.socketio: SocketIO = socketio
self.subscribed_channels = set()
logger.info(f"Initializing Client Notifications module...: {str(self)}")
# create a dictionary to store the user_id for each client
self.client_user_ids = {}
# event listeners
self.socketio.on_event('subscribe', self.on_subscribe)
self.socketio.on_event('unsubscribe', self.on_unsubscribe)
self.socketio.on_event('disconnect', self.on_disconnect)
def event_bus_callback(self, event_message, sid, socketio):
self.on_event(event_message, sid, socketio)
def on_subscribe(self, data):
"""
Subscribe to Redis events for the given project_id
"""
logger.info(f'on_subscribe...')
user_id = self.client_user_ids.get(request.sid)
project_id = data['project_id']
redis_channel = f'{project_id}'
if redis_channel in self.subscribed_channels:
return
logger.info(f'Subscribing to channel: {redis_channel}. Emitting from: {str(self)} with socketio {str(self.socketio)} to room: {user_id}')
# subscribe to eventbus notifications and join room
sid = request.sid
callback = functools.partial(self.event_bus_callback, sid=sid, socketio=self.socketio)
self.event_bus.subscribe(redis_channel, callback)
self.socketio.emit(
'subscribed',
{'message': f"You are subscribed to the following channel: {project_id}"},
namespace='/notifications',
room=user_id
)
logger.info(f"Emitting event 'subscribed' to room: {user_id}")
self.subscribed_channels.add(redis_channel)
return redis_channel
def on_event(self, event_message, sid, socketio):
"""
Trigger a real-time UI update when a message is saved or blueprints are completed
"""
with self.app.app_context():
logger.info(event_message)
data = event_message['data']
event = event_message['event']
project_id = data['project']
user_id = self.client_user_ids.get(sid)
logger.info(f"emitting {event} from {str(self)} with socketio {str(socketio)} to room {user_id}: {event}")
if event == 'message_saved':
message = data['content']
print(project_id + message + user_id)
emit(
'message_saved',
{'project': project_id, 'message': message},
namespace='/notifications',
to=sid
)
else:
logger.info(f"Unrecognized event in client notifications: {event}")
这是我们事件总线的以下逻辑:
import json
import threading
from typing import Callable
from Config.redis_store import get_redis_conn
class EventBus:
"""
This class is an implementation of an event bus using Redis.
The EventBus class provides a simple way to publish events to specific channels and subscribe to these channels
to receive events.
"""
def __init__(self):
self.redis_conn = get_redis_conn()
def publish(self, channel: str, event: str, data):
"""
Publishes an event to the specified channel.
"""
message = json.dumps({'event': event, 'data': data})
self.redis_conn.publish(channel, message)
def subscribe(self, channel: str, callback: Callable):
"""
Subscribes to a specific channel, and invokes the callback function whenever an event is received.
"""
pubsub = self.redis_conn.pubsub()
pubsub.subscribe(channel)
def listen_and_handle_messages():
for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
callback(data)
listener_thread = threading.Thread(target=listen_and_handle_messages)
listener_thread.daemon = True
listener_thread.start()
def unsubscribe(self, channel: str):
"""
Unsubscribes from the specified channel.
"""
self.redis_conn.pubsub().unsubscribe(channel)
这就是我们在 app.py 中初始化的方式
redis_conn = get_redis_conn()
socketio = get_socketio_conn()
socketio.init_app(app)
client_notifications = ClientNotifications(app, socketio, '/notifications')
socketio.on_namespace(client_notifications)
每当我们发布到事件总线时,都会成功调用 on_event 函数。然而,emit 函数实际上并没有发出任何东西并且默默地失败了。此外,当前端客户端向 on_subscribe 发送某些内容时,该逻辑完美运行,因为客户端还删除了从服务器发回的消息。关于如何解决这个问题的任何想法。谢谢!