从Python子进程获取最新的流式stdout行

问题描述 投票:0回答:2

我的目标:每M秒从子进程中读取最新的“块”(N行)流式标准输出。

当前代码:

  1. 启动子过程
  2. 读取标准输出
  3. 一旦我有N行大块,将其打印出来(或另存为当前块)
  4. 等待M秒
  5. 重复
  6. 我现在也放置了代码以终止子进程(直到您按下Ctrl-C为止,它是无尽的流)

我想要实现的是等待M秒之后,如果它总是读取latest N行,而不是标准输出中的后续N行(它们可以在我只对最新感兴趣)

我的最终目标是产生一个线程来运行该过程并继续保存最新的行,然后在需要最新的流结果时从主进程中调用。

任何帮助将不胜感激!

#!/usr/bin/env python3
import signal
import time
from subprocess import Popen, PIPE

sig = signal.SIGTERM

N=9
M=5

countlines=0
p = Popen(["myprogram"], stdout=PIPE, bufsize=1, universal_newlines=True)

chunk=[]

for line in p.stdout:
    countlines+=1
    chunk.append(line)

    if len(chunk)==N:
        print(chunk)
        chunk=[]
        time.sleep(M)

    if countlines>100:
        p.send_signal(sig)
        break

print("done")
python stream subprocess
2个回答
0
投票

经过大量搜索,我在这里偶然发现了一个解决方案:

https://eli.thegreenplace.net/2017/interacting-with-a-long-running-child-process-in-python/

Eli的“启动,交互,实时获取输出,终止”代码部分为我工作。到目前为止,这是我找到的最优雅的解决方案。

适应上面的问题,并写在一个类中(此处未显示):

def output_reader(self,proc):
    chunk=[]
    countlines=0
    for line in iter(proc.stdout.readline, b''):
        countlines+=1
        chunk.append(line.decode("utf-8"))
        if countlines==N:
            self.current_chunk = chunk
            chunk=[]
            countlines=0

def main():
    proc = subprocess.Popen(['myprocess'],
                            stdout=subprocess.PIPE,
                            stderr=subprocess.STDOUT)

    t = threading.Thread(target=output_reader, args=(proc,))
    t.start()

    try:
        time.sleep(0.2)
        for i in range(10):
            time.sleep(1) # waits a while before getting latest lines
            print(self.current_chunk)
    finally:
        proc.terminate()
        try:
            proc.wait(timeout=0.2)
            print('== subprocess exited with rc =', proc.returncode)
        except subprocess.TimeoutExpired:
            print('subprocess did not terminate in time')
    t.join()

0
投票

这里是另一种可能的解决方案。该程序将在管道中作为单独的进程运行,并提供一个REST API,它将返回在stdin上读取的最后N行。它在flask中使用run,因此,在外部环境可以访问本地服务器端口进行请求的情况下,不应使用它,尽管可以对此进行调整。

import sys
import time
import threading
import argparse
from flask import Flask, request
from flask_restful import Resource, Api


class Server:

    def __init__(self):
        self.data = {'at_eof': False,
                     'lines_read': 0,
                     'latest_lines': []}
        self.thread = None
        self.args = None
        self.stop = False


    def parse_args(self):
        parser = argparse.ArgumentParser()
        parser.add_argument("num_lines", type=int,
                            help="number of lines to cache")
        parser.add_argument("port", type=int,
                            help="port to serve on")
        self.args = parser.parse_args()


    def start_updater(self):
        def updater():
            lines = self.data['latest_lines']
            while True:
                if self.stop:
                    return
                line = sys.stdin.readline()
                if not line:
                    break
                self.data['lines_read'] += 1
                lines.append(line)
                while len(lines) > self.args.num_lines:
                    lines.pop(0)
            self.data['at_eof'] = True
        self.thread = threading.Thread(target=updater)
        self.thread.start()


    def get_data(self):
        return self.data


    def shutdown(self):
        self.stop = True
        func = request.environ.get('werkzeug.server.shutdown')
        if func:
            func()
            return 'Shutting down'
        else:
            return 'shutdown failed'


    def add_apis(self, app):

        class GetData(Resource):
            get = self.get_data

        class Shutdown(Resource):
            get = self.shutdown            

        api = Api(app)
        api.add_resource(GetData, "/getdata")
        api.add_resource(Shutdown, "/shutdown")


    def run(self):
        self.parse_args()
        self.start_updater()        
        app = Flask(__name__)
        self.add_apis(app)
        app.run(port=self.args.port)


server = Server()
server.run()

示例用法:这是我们要提供其输出的测试程序:

import sys
import time

for i in range(100):
    print("this is line {}".format(i))
    sys.stdout.flush()
    time.sleep(.1)

和一个简单的管道来启动它(在Linux shell提示符下,但是可以通过subprocess.Popen完成,在端口8001上提供最后5行:

python ./writer.py  | python ./server.py 5 8001

一个示例查询,这里使用curl作为客户端,但是可以通过Python requests来完成:

$ curl -s http://localhost:8001/getdata
{"at_eof": false, "lines_read": 30, "latest_lines": ["this is line 25\n", "this is line 26\n", "this is line 27\n", "this is line 28\n", "this is line 29\n"]}

服务器还提供了一个http://localhost:<port>/shutdown URL来终止它。如果您在第一次看到"at_eof": true之前就调用它,那么期望编写器因管道破裂而死亡。

© www.soinside.com 2019 - 2024. All rights reserved.