我的网站使用 Starlette 框架并使用 websockets。我有一个派生自
WebSocketEndpoint
的类,用于处理传入的 Websocket 连接。我有另一个类用于管理 Pub/Sub
上的 Redis
。
我试图让回调被调用,我在订阅时注册,但我无法实现相同的目标。在查看订阅方法的文档时,它说:
Subscribe to channels. Channels supplied as keyword arguments expect a channel name as the key and a callable as the value. A channel's callable will be invoked automatically when a message is received on that channel rather than producing a message via ``listen()`` or ``get_message()``.
我在这里缺少什么?
WSEndpoint 类:
from starlette.endpoints import WebSocketEndpoint
from starlette.websockets import WebSocket
import json
from blueprints import redis_pubsub_manager
class WSEndpoint(WebSocketEndpoint):
encoding = 'json'
def __init__(self, scope, receive, send):
super().__init__(scope, receive, send)
self.global_chatroom_channel_name = "globalchat"
self.connected_users = []
self.redis_manager_sub = redis_pubsub_manager.RedisPubSubManager(room=self.global_chatroom_channel_name)
self.redis_manager_pub = redis_pubsub_manager.RedisPubSubManager(room=self.global_chatroom_channel_name)
self.global_subscription = None
print("done")
async def on_connect(self, websocket: WebSocket):
await websocket.accept()
self.connected_users.append(websocket)
if self.global_subscription is None:
await self.redis_manager_sub.connect()
await self.redis_manager_pub.connect()
self.global_subscription = await self.redis_manager_sub.subscribe(callback = self.publish_message_to_subscribers)
print(f"Socket Connected: {websocket}")
async def on_receive(self, websocket: WebSocket, data):
await self.redis_manager_pub._publish(json.dumps(data))
async def publish_message_to_subscribers(msg):
print("inside publish_message_to_subscribers")
print(msg)
RedisPubSubManager 类:
import asyncio
import redis.asyncio as aioredis
import json
class RedisPubSubManager:
def __init__(self, host='localhost', port=6379, room="globalchat"):
self.redis_host = host
self.redis_port = port
self.pubsub = None
self.room = room
async def _get_redis_connection(self) -> aioredis.Redis:
return aioredis.Redis(host=self.redis_host,
port=self.redis_port,
auto_close_connection_pool=False)
async def connect(self) -> None:
self.redis_connection = await self._get_redis_connection()
self.pubsub = self.redis_connection.pubsub()
async def _publish(self, message) -> None:
await self.redis_connection.publish(self.room, message)
async def subscribe(self, callback) -> aioredis.Redis:
await self.pubsub.subscribe(**{"globalchat": callback})
return self.pubsub
最近我也遇到了这个问题,在检查了很多资源后得出以下结论。
如果您查看 Redis 库代码的内部实现,特别是
.listen()
方法的实现(下面针对 aioredis==2.0.1
进行共享,我猜 Redis Python SDK 的实现将相同/相似,但您可以交叉引用一次),请注意它获取redis消息,然后进行一些处理,找到handler
(它可以在调用.subscribe
函数时传递,您已经正确完成)并使用消息调用处理程序或直接产生结果。
async def listen(self) -> AsyncIterator:
"""Listen for messages on channels this client has been subscribed to"""
while self.subscribed:
response = self.handle_message(await self.parse_response(block=True))
if response is not None:
yield response
def handle_message(self, response, ignore_subscribe_messages=False):
"""
Parses a pub/sub message. If the channel or pattern was subscribed to
with a message handler, the handler is invoked instead of a parsed
message being returned.
"""
message_type = str_if_bytes(response[0])
if message_type == "pmessage":
message = {
"type": message_type,
"pattern": response[1],
"channel": response[2],
"data": response[3],
}
elif message_type == "pong":
message = {
"type": message_type,
"pattern": None,
"channel": None,
"data": response[1],
}
else:
message = {
"type": message_type,
"pattern": None,
"channel": response[1],
"data": response[2],
}
# if this is an unsubscribe message, remove it from memory
if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES:
if message_type == "punsubscribe":
pattern = response[1]
if pattern in self.pending_unsubscribe_patterns:
self.pending_unsubscribe_patterns.remove(pattern)
self.patterns.pop(pattern, None)
else:
channel = response[1]
if channel in self.pending_unsubscribe_channels:
self.pending_unsubscribe_channels.remove(channel)
self.channels.pop(channel, None)
if message_type in self.PUBLISH_MESSAGE_TYPES:
# if there's a message handler, invoke it
if message_type == "pmessage":
handler = self.patterns.get(message["pattern"], None)
else:
handler = self.channels.get(message["channel"], None)
if handler:
handler(message)
return None
elif message_type != "pong":
# this is a subscribe/unsubscribe message. ignore if we don't
# want them
if ignore_subscribe_messages or self.ignore_subscribe_messages:
return None
return message
所以我担心您将不得不创建一个将调用
.listen
方法的协同例程,并且您可以从该协同例程或您的处理程序中处理 Redis 消息。
最终代码看起来像这样:
class WSEndpoint(WebSocketEndpoint):
encoding = 'json'
def __init__(self, scope, receive, send):
asyncio.ensure_future(
self.__pubsub_data_reader(
self.redis_manager_sub
)
)
async def __pubsub_data_reader(self, pubsub):
"""
Reads messages received from Redis PubSub.
Args:
pubsub (aioredis.ChannelSubscribe): PubSub object for the subscribed channel.
"""
print("started __pubsub_data_reader")
try:
async for message in pubsub.listen():
pass
except Exception as exc:
print("Got error: ", exc)
看看这是否有帮助,如果没有,我可以与您分享完整的 Pub-Sub 代码,这可以帮助您进一步理解流程。
PS:在 Python 中,感觉不是真正的异步,因为最后有一个 while 循环不断检查某些内容(与 Node.JS 等对应部分相比)。如果有人更了解,很想了解他们的观点。另外,如果有人知道更好的解决方案,请告诉我们,因为上面的解决方案似乎有点hacky:)