我目前正在开发的一个系统正在redis中接收消息。我想编写一个 python 脚本,可以读取这些消息,修改它们并将它们返回到 redis 中的不同通道/键。
例如:
redis-cli -a 'redis' monitor | grep '"pushName"'
会回来:
1707963448.404618 [0 192.168.86.75:51520] "LPUSH" "pushName" "{\"uuid\":"be70\"...,{}}
我如何订阅以获取来自
pushName
的消息,因为当我尝试这样做时
mobile = r.pubsub()
mobile.subscribe('pushName')
for message in mobile.listen():
print(message)
除了 :
之外什么都不显示{'type': 'subscribe', 'pattern': None, 'channel': 'pushName', 'data': 1}
。我已经知道我的连接标准很好,因为当我这样做时我可以获得频道列表:
index_list = []
for key in r.scan_iter('*'):
index_list.append(key)
但是当我这样做时,消息就会飞来飞去:
redis-cli -a 'redis' monitor | grep '"pushName"'
您似乎将调试输出与消息、Redis LIST 和 Redis Pub/Sub 混淆了。
如果您的其他(实际)代码正在执行 LPUSH 操作,那么它会将项目附加到 Redis LIST。
这些 LPUSH 操作将显示在您的
redis-cli monitor
命令中,因为这是一个调试工具,向您显示所有 Redis 内部操作。
当您订阅 Redis Pub/Sub 时,LPUSH 命令(在 Redis LIST 上运行)不会显示,因为这是 Redis 的一个完全独立的功能,只有当您订阅某个主题并且当有人在该主题上发布...但没有人进行任何发布时,他们只是对列表进行 LPUSH。
也许你可以调整一些东西,让它们更像你期望的那样工作。如果您希望每次 LIST 上有 LPUSH 时都收到通知,您可以将 LPUSH 更改为执行 LPUSH 的 MULTI/EXEC 事务,然后发布一条消息以通知单个事务中的所有相关方。这将使您的发件人看起来像这样:
#!/usr/bin/env python3
from time import sleep
import redis
import numpy as np
# Redis connection
r = redis.Redis(host='192.168.0.10', port=6379, db=0)
# Pub/sub connection
p = r.pubsub()
# Empty list at start of each run
r.delete('myList')
# Push 10 items to Redis LIST, publishing that they are available
N = 10
for i in range(N):
print(f'DEBUG: Publishing item {i} and notifying subscribers')
# Start pipeline, i.e. MULTI/EXEC transaction
pipe = r.pipeline()
pipe.lpush('myList', i)
pipe.publish('notifications',f'LPUSHed to myList')
pipe.execute()
sleep(1)
然后你的接收者可能看起来像这样,订阅通知流,并在收到通知时抓取列表:
#!/usr/bin/env python3
from time import time, sleep
import redis
import numpy as np
def msgHandler(message):
"""Called whenever a message is published"""
print(message)
# See what's in our LIST
items = r.lrange('myList',0,100)
print(items)
# You could put the items into a regular Python queue here for main program to read
# Redis connection
r = redis.Redis(host='192.168.0.10', port=6379, db=0)
# Subscribe to notifications and register callback for when messages arrive
pubsub = r.pubsub(ignore_subscribe_messages=True)
pubsub.subscribe(**{'notifications': msgHandler})
# Start a background message receiver
# See https://redis.readthedocs.io/en/stable/advanced_features.html
thread = pubsub.run_in_thread(sleep_time=0.001)
# Do something else for 15 seconds
endTime = time() + 15
while time() < endTime:
print('Waiting for data:')
sleep(0.3)
# Stop the redis pub/sub listener
thread.stop()
当您运行接收器时,它看起来像这样:
Waiting for data:
Waiting for data:
{'type': 'message', 'pattern': None, 'channel': b'notifications', 'data': b'LPUSHed to myList'}
[b'0']
Waiting for data:
Waiting for data:
Waiting for data:
Waiting for data:
{'type': 'message', 'pattern': None, 'channel': b'notifications', 'data': b'LPUSHed to myList'}
[b'1', b'0']
Waiting for data:
Waiting for data:
Waiting for data:
{'type': 'message', 'pattern': None, 'channel': b'notifications', 'data': b'LPUSHed to myList'}
[b'2', b'1', b'0']
...
...
Waiting for data:
Waiting for data:
{'type': 'message', 'pattern': None, 'channel': b'notifications', 'data': b'LPUSHed to myList'}
[b'9', b'8', b'7', b'6', b'5', b'4', b'3', b'2', b'1', b'0']
Waiting for data:
请注意,实际上有 3 种方法可以等待您订阅的消息,此处对此进行了描述。基本上,你可以使用:
while True:
message = p.get_message()
if message:
# do something with the message
time.sleep(0.001) # be nice to the system :)
或者您可以使用:
for message in p.listen():
# do something with the message
或者你可以像我一样使用线程。
请注意,我使用管道(MULTI/EXEC)有两个原因:
请注意,您可以将管道元素链接在一起,以便我的发送者代码中的 4 行管道变为:
r.pipeline().lpush('myList', i).publish('notifications',f'LPUSHed to myList').execute()