Python 中的单向通信

问题描述 投票:0回答:1

最后一天,我一直在尝试在两个 python 脚本之间进行一些单向连续通信,作为更大项目的练习。但是, subprocess.communicate 只允许一次性消息,到目前为止我还无法让第二个脚本在写入时侦听和打印 stdin 数据。我在 stackoverflow 上进行了广泛的查找,但还没有找到有效的答案(我只想使用 subprocess、sys 或 os 模块)。

具体来说,我希望定期(例如每秒)将数据写入标准输入,并使用另一个 python 脚本连续侦听,并在将某些内容写入标准输入时打印数据。

我得到的最接近的是:

## script1.py (the script run)

from subprocess import Popen, PIPE
import time

inp1 = 'string1 /n'.encode()
inp2 = 'string2 /n'.encode()
inp3 = 'string3 /n'.encode()

p=Popen(["python","script2.py"], stdin=PIPE)

for i in [inp1, inp2, inp3]:
   p.stdin.write(i)
   p.stdin.flush()
   time.sleep(1)

p.stdin.close()


## script2.py

import sys

while True:
   try:
      for line in sys.stdin:
         print(line)
   except EOFError:
      pass

这曾经有效,但现在只是在数据全部写入后批量输出数据。我也不知道这是否只是因为它在 Raspberry Pi 4 上。

python subprocess stdout stdin communication
1个回答
0
投票

在 script1.py 中,您需要发送带有某些字符的编码数据,表明“行”已完成。 ' ' 非常适合这个。

所以:

# script1.py

from subprocess import Popen, PIPE
import time

PYTHON = '/Library/Frameworks/Python.framework/Versions/3.12/bin/python3'

inp1 = 'string1'
inp2 = 'string2'
inp3 = 'string3'

if (p := Popen([PYTHON, "script2.py"], stdin=PIPE)):
    if p.stdin is not None:
        for i in [inp1, inp2, inp3]:
            p.stdin.write(i.encode())
            p.stdin.write(b'\n')
            p.stdin.flush()
            time.sleep(1)

然后,在 script2.py 中,您需要一次读取一个字节。 EOF 不是以异常的方式表示,而是以零值表示。

因此:

# script2.py

import sys

b = bytearray()

while True:
    if (c := sys.stdin.buffer.read(1)) == 0:
        if b:
            print(b.decode())
        break
    if c == b'\n':
        print(b.decode())
        b = bytearray()
    else:
        b += c
© www.soinside.com 2019 - 2024. All rights reserved.