使用redis.asyncio订阅时无法调用回调

问题描述 投票:0回答:1

我的网站使用 Starlette 框架并使用 websockets。我有一个派生自

WebSocketEndpoint
的类,用于处理传入的 Websocket 连接。我有另一个类用于管理
Pub/Sub
上的
Redis

我试图让回调被调用,我在订阅时注册,但我无法实现相同的目标。在查看订阅方法的文档时,它说:

    Subscribe to channels. Channels supplied as keyword arguments expect
    a channel name as the key and a callable as the value. A channel's
    callable will be invoked automatically when a message is received on
    that channel rather than producing a message via ``listen()`` or
    ``get_message()``.

我在这里缺少什么?

WSEndpoint 类:

from starlette.endpoints import WebSocketEndpoint
from starlette.websockets import WebSocket
import json
from blueprints import redis_pubsub_manager

class WSEndpoint(WebSocketEndpoint):
    encoding = 'json'

    def __init__(self, scope, receive, send):
        super().__init__(scope, receive, send)

        self.global_chatroom_channel_name = "globalchat"
        self.connected_users = []
        self.redis_manager_sub = redis_pubsub_manager.RedisPubSubManager(room=self.global_chatroom_channel_name)
        self.redis_manager_pub = redis_pubsub_manager.RedisPubSubManager(room=self.global_chatroom_channel_name)

        self.global_subscription = None

        print("done")

    async def on_connect(self, websocket: WebSocket):
        await websocket.accept()
        self.connected_users.append(websocket)
        if self.global_subscription is None:
            await self.redis_manager_sub.connect()
            await self.redis_manager_pub.connect()
            self.global_subscription = await self.redis_manager_sub.subscribe(callback = self.publish_message_to_subscribers)

        print(f"Socket Connected: {websocket}")


    async def on_receive(self, websocket: WebSocket, data):
        await self.redis_manager_pub._publish(json.dumps(data))

    async def publish_message_to_subscribers(msg):
        print("inside publish_message_to_subscribers")
        print(msg)

RedisPubSubManager 类:

import asyncio
import redis.asyncio as aioredis
import json

class RedisPubSubManager:
    def __init__(self, host='localhost', port=6379, room="globalchat"):
        self.redis_host = host
        self.redis_port = port
        self.pubsub = None
        self.room = room

    async def _get_redis_connection(self) -> aioredis.Redis:
        return aioredis.Redis(host=self.redis_host,
                              port=self.redis_port,
                              auto_close_connection_pool=False)
    
    async def connect(self) -> None:
        self.redis_connection = await self._get_redis_connection()
        self.pubsub = self.redis_connection.pubsub()


    async def _publish(self, message) -> None:
        await self.redis_connection.publish(self.room, message)

    async def subscribe(self, callback) -> aioredis.Redis:
        await self.pubsub.subscribe(**{"globalchat": callback})

        return self.pubsub
redis python-asyncio
1个回答
0
投票

最近我也遇到了这个问题,在检查了很多资源后得出以下结论。

如果您查看 Redis 库代码的内部实现,特别是

.listen()
方法的实现(下面针对
aioredis==2.0.1
进行共享,我猜 Redis Python SDK 的实现将相同/相似,但您可以交叉引用一次),请注意它获取redis消息,然后进行一些处理,找到
handler
(它可以在调用
.subscribe
函数时传递,您已经正确完成)并使用消息调用处理程序或直接产生结果。

async def listen(self) -> AsyncIterator:
    """Listen for messages on channels this client has been subscribed to"""
    while self.subscribed:
        response = self.handle_message(await self.parse_response(block=True))
        if response is not None:
            yield response




def handle_message(self, response, ignore_subscribe_messages=False):
    """
    Parses a pub/sub message. If the channel or pattern was subscribed to
    with a message handler, the handler is invoked instead of a parsed
    message being returned.
    """
    message_type = str_if_bytes(response[0])
    if message_type == "pmessage":
        message = {
            "type": message_type,
            "pattern": response[1],
            "channel": response[2],
            "data": response[3],
        }
    elif message_type == "pong":
        message = {
            "type": message_type,
            "pattern": None,
            "channel": None,
            "data": response[1],
        }
    else:
        message = {
            "type": message_type,
            "pattern": None,
            "channel": response[1],
            "data": response[2],
        }

    # if this is an unsubscribe message, remove it from memory
    if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES:
        if message_type == "punsubscribe":
            pattern = response[1]
            if pattern in self.pending_unsubscribe_patterns:
                self.pending_unsubscribe_patterns.remove(pattern)
                self.patterns.pop(pattern, None)
        else:
            channel = response[1]
            if channel in self.pending_unsubscribe_channels:
                self.pending_unsubscribe_channels.remove(channel)
                self.channels.pop(channel, None)

    if message_type in self.PUBLISH_MESSAGE_TYPES:
        # if there's a message handler, invoke it
        if message_type == "pmessage":
            handler = self.patterns.get(message["pattern"], None)
        else:
            handler = self.channels.get(message["channel"], None)
        if handler:
            handler(message)
            return None
    elif message_type != "pong":
        # this is a subscribe/unsubscribe message. ignore if we don't
        # want them
        if ignore_subscribe_messages or self.ignore_subscribe_messages:
            return None

    return message

所以我担心您将不得不创建一个将调用

.listen
方法的协同例程,并且您可以从该协同例程或您的处理程序中处理 Redis 消息。

最终代码看起来像这样:

class WSEndpoint(WebSocketEndpoint):
    encoding = 'json'

    def __init__(self, scope, receive, send):
        asyncio.ensure_future(                        
            self.__pubsub_data_reader(
                self.redis_manager_sub
            )
        )


    async def __pubsub_data_reader(self, pubsub):
        """
        Reads messages received from Redis PubSub.

        Args:
        pubsub (aioredis.ChannelSubscribe): PubSub object for the subscribed channel.
        """
        print("started __pubsub_data_reader")
        try:
            async for message in pubsub.listen():
                pass
        except Exception as exc:
            print("Got error: ", exc)

看看这是否有帮助,如果没有,我可以与您分享完整的 Pub-Sub 代码,这可以帮助您进一步理解流程。

PS:在 Python 中,感觉不是真正的异步,因为最后有一个 while 循环不断检查某些内容(与 Node.JS 等对应部分相比)。如果有人更了解,很想了解他们的观点。另外,如果有人知道更好的解决方案,请告诉我们,因为上面的解决方案似乎有点hacky:)

© www.soinside.com 2019 - 2024. All rights reserved.