如何在本地机器上的不同版本的 Python 脚本之间发送单向消息?

问题描述 投票:0回答:3

我在 Python 3.10 中有一个第一个发送者脚本需要发送一些数据

def post_updates(*args):
   sender.send_message("optional_key", args)

然后是 Python 3.7 中的第二个接收脚本,它需要接收此数据

while True:
   args = receiver.get_message("optional_key", blocking=True)
   print("args received:", args)

约束条件:

  • 每个脚本不应依赖于另一个脚本的存在来运行。
  • 无论接收方是否正在运行,发送方都应尝试发送。
  • 无论发送方是否正在运行,接收方都应尝试接收。
  • 消息可以由基本的 python 对象(字典、列表)组成,并且应该自动序列化。
  • 我需要每秒发送超过 100 条消息(尽可能减少延迟)。
  • 仅限本地 PC (Windows),无需安全保护。

这个简单的问题是否有 1-liner 解决方案?我查找的所有内容似乎都过于复杂或需要事先启动 TCP 服务器。我不介意安装流行的模块。

编辑:

我预计会有一些模块可以方便地交换基本的 Python 对象,而无需实现代码和处理怪癖。我还没有找到,所以我在下面发布了一个可行的解决方案,灵感来自建议。

python python-3.x ipc
3个回答
0
投票

UDP 和 JSON 看起来非常适合您的要求,只要

  • 你不需要有一个以上的接收器
  • 你不需要非常大的消息
  • 你只需要发送字典、列表、字符串和数字的组合,而不是任意类的 Python 对象
  • 关于找到“1-liner”你并不过分直截了当:它是一个非常小的代码量,你可以自由定义你自己的辅助函数。

Python 的标准库有你需要的一切。从 JSON 编码和解码就像

json.dumps()
json.loads()
一样简单。对于发送和接收,我建议遵循 Python wiki 上的示例。您需要先使用

创建套接字
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

无论您是发送者还是接收者。然后接收器需要绑定到本地端口来收听它:

sock.bind(('127.0.0.1', PORT))

然后发送者用

sock.sendto()
发送,接收者用
sock.recvfrom()
接收。


0
投票

如果有人正在寻找实际的解决方案,这是我发现满足所有要求的最简单的解决方案。

import json
import time
import zmq
import os

class ZmqSender():
    def __init__(self, addr="tcp://127.0.0.1:12345"):
        self.ctx = zmq.Context()
        self.sock = self.ctx.socket(zmq.PUB) # TCP Publisher (UDP/RADIO/DISH unavailable)
        self.sock.connect(addr) # Here many publishers can connect, one subscriber will bind
        self.pid = os.getpid()  # Cached for performance

    def send_message(self, *args):
        msg = dict(_pid=self.pid, _qpc=time.perf_counter(), args=args)
        self.sock.send(json.dumps(msg).encode())

class ZmqReceiver():
    def __init__(self, addr="tcp://127.0.0.1:12345"):
        self.ctx = zmq.Context()
        self.sock = self.ctx.socket(zmq.SUB)  # Subscriber
        self.sock.bind(addr)    # Here one subscriber (the stable end) binds the port
        self.sock.subscribe('') # Subscribe to any publishing

    def get_pending_messages(self):
        """Returns a list of all messages in input buffer without blocking."""
        msgs = []
        try:
            while True:  # repeat until no further messages
                msgs.append(json.loads(self.sock.recv(flags=zmq.NOBLOCK))) # JSON decode
        except zmq.Again as e:
            pass    # No further message awaiting
        return msgs

然后对于发件人:

zs = ZmqSender()
zs.send_message("Hello", 12345, dict(key=999))

接收器:

zr = ZmqReceiver()
while True:
    for msg in zr.get_pending_messages():
        print(msg)
    time.sleep(0.01)

如果有人有更简单的解决方案,请分享 ;-)


-1
投票

好的旧管道可能会完成这项工作,但您需要评估需要多大的缓冲区大小(考虑到发送方/接收方的异步性质),并更改默认的管道缓冲区大小。

© www.soinside.com 2019 - 2024. All rights reserved.