Socket.io 在被 Redis 事件总线触发时无法从回调函数发出消息

问题描述 投票:0回答:0

我们有以下逻辑来实现 WebSocket 通知命名空间。我删除了某些不重要的功能,例如 on_connect,因为该逻辑有效:

class ClientNotifications(Namespace):
    def __init__(self, app, socketio, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.app = app 
        self.event_bus: EventBus = EventBus()
        self.socketio: SocketIO = socketio
        self.subscribed_channels = set()

        logger.info(f"Initializing Client Notifications module...: {str(self)}")

        # create a dictionary to store the user_id for each client
        self.client_user_ids = {}

        # event listeners
        self.socketio.on_event('subscribe', self.on_subscribe)
        self.socketio.on_event('unsubscribe', self.on_unsubscribe)
        self.socketio.on_event('disconnect', self.on_disconnect)

    def event_bus_callback(self, event_message, sid, socketio):
        self.on_event(event_message, sid, socketio)


    def on_subscribe(self, data):
        """
        Subscribe to Redis events for the given project_id
        """
        logger.info(f'on_subscribe...')
        user_id = self.client_user_ids.get(request.sid)
        project_id = data['project_id']
        redis_channel = f'{project_id}'
        if redis_channel in self.subscribed_channels:
            return

        logger.info(f'Subscribing to channel: {redis_channel}. Emitting from: {str(self)} with socketio {str(self.socketio)} to room: {user_id}')

        # subscribe to eventbus notifications and join room
        
        sid = request.sid
        callback = functools.partial(self.event_bus_callback, sid=sid, socketio=self.socketio)
        self.event_bus.subscribe(redis_channel, callback)

        self.socketio.emit(
            'subscribed',
            {'message': f"You are subscribed to the following channel: {project_id}"},
            namespace='/notifications',
            room=user_id
        )

        logger.info(f"Emitting event 'subscribed' to room: {user_id}")

        self.subscribed_channels.add(redis_channel)

        return redis_channel

    
    
    def on_event(self, event_message, sid, socketio):
        """
        Trigger a real-time UI update when a message is saved or blueprints are completed
        """
        with self.app.app_context():
            logger.info(event_message)
            data = event_message['data']
            event = event_message['event']
            project_id = data['project']
            user_id = self.client_user_ids.get(sid)

            logger.info(f"emitting {event} from {str(self)} with socketio {str(socketio)} to room {user_id}: {event}")

            
            if event == 'message_saved':
                message = data['content']

                print(project_id + message + user_id)

                emit(
                    'message_saved',
                    {'project': project_id, 'message': message},
                    namespace='/notifications',
                    to=sid
                    
                )

            
            else:
                logger.info(f"Unrecognized event in client notifications: {event}")

这是我们事件总线的以下逻辑:

import json
import threading
from typing import Callable

from Config.redis_store import get_redis_conn


class EventBus:
    """
    This class is an implementation of an event bus using Redis.
    The EventBus class provides a simple way to publish events to specific channels and subscribe to these channels
    to receive events.
    """
    def __init__(self):
        self.redis_conn = get_redis_conn()

    def publish(self, channel: str, event: str, data):
        """
        Publishes an event to the specified channel.
        """
        message = json.dumps({'event': event, 'data': data})
        self.redis_conn.publish(channel, message)

    def subscribe(self, channel: str, callback: Callable):
        """
        Subscribes to a specific channel, and invokes the callback function whenever an event is received.
        """
        pubsub = self.redis_conn.pubsub()
        pubsub.subscribe(channel)

        def listen_and_handle_messages():
            for message in pubsub.listen():
                if message['type'] == 'message':
                    data = json.loads(message['data'])
                    callback(data)

        listener_thread = threading.Thread(target=listen_and_handle_messages)
        listener_thread.daemon = True
        listener_thread.start()

    def unsubscribe(self, channel: str):
        """
        Unsubscribes from the specified channel.
        """
        self.redis_conn.pubsub().unsubscribe(channel)

这就是我们在 app.py 中初始化的方式

redis_conn = get_redis_conn()
socketio = get_socketio_conn()
socketio.init_app(app)
client_notifications = ClientNotifications(app, socketio, '/notifications')
socketio.on_namespace(client_notifications)

每当我们发布到事件总线时,都会成功调用 on_event 函数。然而,emit 函数实际上并没有发出任何东西并且默默地失败了。此外,当前端客户端向 on_subscribe 发送某些内容时,该逻辑完美运行,因为客户端还删除了从服务器发回的消息。关于如何解决这个问题的任何想法。谢谢!

flask websocket redis flask-socketio
© www.soinside.com 2019 - 2024. All rights reserved.