使用Redis python读取已经发送到redis的消息

问题描述 投票:0回答:2

我目前正在开发的一个系统正在redis中接收消息。我想编写一个 python 脚本,可以读取这些消息,修改它们并将它们返回到 redis 中的不同通道/键。

例如:

 redis-cli  -a 'redis' monitor | grep '"pushName"'

会回来:

1707963448.404618 [0 192.168.86.75:51520] "LPUSH" "pushName" "{\"uuid\":"be70\"...,{}}

我如何订阅以获取来自

pushName
的消息,因为当我尝试这样做时

mobile = r.pubsub()
mobile.subscribe('pushName')
for message in mobile.listen():
    print(message)

除了 :

之外什么都不显示
{'type': 'subscribe', 'pattern': None, 'channel': 'pushName', 'data': 1}

。我已经知道我的连接标准很好,因为当我这样做时我可以获得频道列表:

index_list = []
for key in r.scan_iter('*'):
    index_list.append(key)

但是当我这样做时,消息就会飞来飞去:

 redis-cli  -a 'redis' monitor | grep '"pushName"'
redis redis-cli redis-py
2个回答
1
投票

您似乎将调试输出与消息、Redis LIST 和 Redis Pub/Sub 混淆了。

如果您的其他(实际)代码正在执行 LPUSH 操作,那么它会将项目附加到 Redis LIST。

这些 LPUSH 操作将显示在您的

redis-cli monitor
命令中,因为这是一个调试工具,向您显示所有 Redis 内部操作。

当您订阅 Redis Pub/Sub 时,LPUSH 命令(在 Redis LIST 上运行)不会显示,因为这是 Redis 的一个完全独立的功能,只有当您订阅某个主题并且当有人在该主题上发布...但没有人进行任何发布时,他们只是对列表进行 LPUSH。


1
投票

也许你可以调整一些东西,让它们更像你期望的那样工作。如果您希望每次 LIST 上有 LPUSH 时都收到通知,您可以将 LPUSH 更改为执行 LPUSH 的 MULTI/EXEC 事务,然后发布一条消息以通知单个事务中的所有相关方。这将使您的发件人看起来像这样:

#!/usr/bin/env python3

from time import sleep
import redis
import numpy as np

# Redis connection
r = redis.Redis(host='192.168.0.10', port=6379, db=0)

# Pub/sub connection
p = r.pubsub()

# Empty list at start of each run
r.delete('myList')

# Push 10 items to Redis LIST, publishing that they are available
N = 10
for i in range(N):
    print(f'DEBUG: Publishing item {i} and notifying subscribers')
    # Start pipeline, i.e. MULTI/EXEC transaction
    pipe = r.pipeline()
    pipe.lpush('myList', i)
    pipe.publish('notifications',f'LPUSHed to myList')
    pipe.execute()
    sleep(1)

然后你的接收者可能看起来像这样,订阅通知流,并在收到通知时抓取列表:

#!/usr/bin/env python3

from time import time, sleep
import redis
import numpy as np

def msgHandler(message):
    """Called whenever a message is published"""
    print(message)
    # See what's in our LIST
    items = r.lrange('myList',0,100)
    print(items)
    # You could put the items into a regular Python queue here for main program to read

# Redis connection
r = redis.Redis(host='192.168.0.10', port=6379, db=0)

# Subscribe to notifications and register callback for when messages arrive
pubsub = r.pubsub(ignore_subscribe_messages=True)
pubsub.subscribe(**{'notifications': msgHandler})
# Start a background message receiver
# See https://redis.readthedocs.io/en/stable/advanced_features.html
thread = pubsub.run_in_thread(sleep_time=0.001)

# Do something else for 15 seconds
endTime = time() + 15
while time() < endTime:
    print('Waiting for data:')
    sleep(0.3)

# Stop the redis pub/sub listener
thread.stop()

当您运行接收器时,它看起来像这样:

Waiting for data:
Waiting for data:
{'type': 'message', 'pattern': None, 'channel': b'notifications', 'data': b'LPUSHed to myList'}
[b'0']
Waiting for data:
Waiting for data:
Waiting for data:
Waiting for data:
{'type': 'message', 'pattern': None, 'channel': b'notifications', 'data': b'LPUSHed to myList'}
[b'1', b'0']
Waiting for data:
Waiting for data:
Waiting for data:
{'type': 'message', 'pattern': None, 'channel': b'notifications', 'data': b'LPUSHed to myList'}
[b'2', b'1', b'0']
...
...
Waiting for data:
Waiting for data:
{'type': 'message', 'pattern': None, 'channel': b'notifications', 'data': b'LPUSHed to myList'}
[b'9', b'8', b'7', b'6', b'5', b'4', b'3', b'2', b'1', b'0']
Waiting for data:

请注意,实际上有 3 种方法可以等待您订阅的消息,此处对此进行了描述。基本上,你可以使用:

while True:
    message = p.get_message()
    if message:
        # do something with the message
    time.sleep(0.001)  # be nice to the system :)

或者您可以使用:

for message in p.listen():
    # do something with the message

或者你可以像我一样使用线程。


请注意,我使用管道(MULTI/EXEC)有两个原因:

  • 这是一个原子事务,所以我们知道该项目将在发布之前被推送到列表上
  • 开销/延迟更少,因为它是到服务器的单次往返,而不是每次操作一次往返

请注意,您可以将管道元素链接在一起,以便我的发送者代码中的 4 行管道变为:

r.pipeline().lpush('myList', i).publish('notifications',f'LPUSHed to myList').execute()
© www.soinside.com 2019 - 2024. All rights reserved.