如何在超时的情况下运行进程并在运行时仍然获得标准输出

问题描述 投票:0回答:4

需求:

  1. X 秒后超时,如果在进程正常结束之前超时,则终止该进程(及其打开的所有进程)。
  2. 在运行时读取持续输出。
  3. 使用产生输出的流程、不产生输出的流程以及产生输出的流程,然后停止产生输出(例如,获取 卡住了)。
  4. 在 Windows 上运行。
  5. 在 Python 3.5.2 上运行。

Python 3 subprocess 模块内置了 timeout ,我自己也尝试过使用计时器和线程实现超时,但它不适用于输出。

readline()
是否阻塞?
readlines()
肯定是在等待进程结束才吐出所有输出,这不是我需要的(我需要持续)。

我即将切换到 Node.js :-(

python python-3.x timeout subprocess stdout
4个回答
3
投票

我会使用 asyncio 来完成此类任务。

从进程中读取 IO,如下接受的 anwser 所示: 如何使用 asyncio 从子进程流式传输 stdout/stderr,并在之后获取其退出代码?

(我不想在这里完全复制)

将其包裹在超时中:

async def killer(trans, timeout):
    await asyncio.sleep(timeout)
    trans.kill()
    print ('killed!!')

trans, *other_stuff = loop.run_until_complete(
                           loop.subprocess_exec(
                                SubprocessProtocol, 'py', '-3', '-c', 'import time; time.sleep(6); print("Yay!")' ,
                                        ) 
                       )

asyncio.ensure_future(killer(trans, 5)) # 5 seconds timeout for the kill
loop.run_forever()

玩得开心...


0
投票

使用下面的 2 个 python 脚本。

  • Master.py将使用

    Popen
    启动一个新进程,并将启动一个观察者线程,该线程将在
    3.0
    秒后终止该进程。

  • 如果写入

    stdout
    的数据中没有换行符,从机必须调用flush方法(在Windows上
    '\n'
    也会导致刷新)。

小心

time
模块不是高精度定时器。

在极端情况下(从具有 USB 1.0 的闪存驱动器读取可执行文件),进程的加载时间可能会超过 3.0 秒

大师.py

import subprocess, threading, time

def watcher(proc, delay):
    time.sleep(delay)
    proc.kill()

proc = subprocess.Popen('python Slave.py', stdout = subprocess.PIPE)
threading.Thread(target = watcher, args = (proc, 3.0)).start()

data = bytearray()

while proc:
    chunk = proc.stdout.read(1)
    if not chunk:
        break
    data.extend(chunk)
    print(data)

从属.py

import time, sys

while True:
    time.sleep(0.1)
    sys.stdout.write('aaaa')
    sys.stdout.flush()

0
投票

Python 3.7+ 上,将

subprocess.run()
capture_output=True
timeout=<your_timeout>
一起使用。如果命令在
<your_timetout>
秒过去之前没有返回,它将终止进程并引发
subprocess.TimeoutExpired
异常,该异常将具有
.stdout
.stderr
属性:

import subprocess

try:
    result = subprocess.run(["sleep", "3"], timeout=2, capture_output=True)
except subprocess.TimeoutExpired as e:
    print("process timed out")
    print(e.stdout)
    print(e.stderr)

您可能还想传递

text=True
(或 Python 上的
universal_newlines=True
<3.7) so that
stdout
stderr
str
而不是
bytes

在旧版本的 Python 上,您需要在调用

capture_output=True
时将
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
替换为
subprocess.run()
,其余部分应该相同。

编辑:这不是你想要的,因为你需要等待进程终止才能读取输出,但这正是我遇到这个问题时想要的。


0
投票

我刚刚发布了这篇文章,其中包含一个测试应用程序运行程序的示例,该运行程序等待应用程序的输出(无论该输出是在

stdout
还是
stderr
上)并在超时时引发异常。

示例测试应用程序:

# app/test_app.py

import os
import sys
import time

APP_READY_STRING = os.environ.get("APP_READY_STRING", "No 'App Ready' string provided")
POST_STARTUP_RUN_TIME = 5
STARTUP_DELAY = 5


def main():
    # log to stdout and stderr in order to test that both can be captured
    print("Hello World!")
    print("And hello again to you, sir!", file=sys.stderr)
    # simulate startup delay
    for i in range(STARTUP_DELAY):
        print(f"{time.strftime('%H:%M:%S')} Test app waiting... {i+1}")
        time.sleep(1)
    # print out the string that's being tested for. it should not matter whether
    # this is printed to stdout or stderr
    print(APP_READY_STRING, flush=True, file=sys.stderr)
    # the app should run for 5 seconds before exiting, this will give enough time
    # to test that killing the app works
    time.sleep(POST_STARTUP_RUN_TIME)
    print("Goodbye World!")

if __name__ == "__main__":
    main()

app_runner
模块:

# app_runner/app_runner.py

import os
import subprocess
import sys

from .processes import kill_process, wait_for_process_output

APP_READY_STRING = "App started successfully..."


class AppRunner(object):
    def __init__(self, app: str, cwd: str):
        """This class is used to run an app in a separate process.

        Args:
            app (str): The name of the app to run.
            cwd (str): The path where the app is located."""
        self.app = app
        env = {
            **os.environ,
            "APP_READY_STRING": APP_READY_STRING,
        }
        cmd = [
            sys.executable,
            self.app,
        ]
        # start the app in a separate process. it's important that the stdout and
        # stderr streams are captured so that they can be checked for the expected
        # output, and that the app isn't run with a shell
        self.process = subprocess.Popen(
            cmd,
            cwd=cwd,
            env=env,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )
        try:
            wait_for_process_output(self.process, APP_READY_STRING)
        except Exception as e:
            raise Exception(
                f"AppRunner app '{self.app}' failed to start", e
            )

    def __enter__(self):
        return self

    def __exit__(self, *args):
        kill_process(self.app)

processes
模块:

# app_runner/processes.py

import random
import string
import sys
import threading
import time
from typing import List, Union

import psutil

DEFAULT_APP_WAIT_TIME_SEC = 8

# This dictionary is used to store the state of the processes that are being
# searched.
processes = {}


def _is_process_match(command: str, process_names: List[str]) -> bool:
    """Identifying a process by its command line is not an exact science."""
    if len(process_names) == 1:
        command_parts = command.split(" ")
        if command_parts[0] == process_names[0]:
            return True
    if len(process_names) > 1 and all(
        [process_name in command for process_name in process_names]
    ):
        return True
    return False


def _find_in_stream(stream, text: str, process_handle: str) -> None:
    while (
        not processes[process_handle]["text_found"].is_set()
        and not processes[process_handle]["timed_out"].is_set()
    ):
        try:
            line = stream.readline().decode("utf-8")
            if text in line:
                processes[process_handle]["text_found"].set()
            _print_log(line)
        except Exception:
            pass


def _print_log(line: str) -> None:
    line = f"{time.strftime('%H:%M:%S')} {line}\n"
    sys.stderr.write(line)
    sys.stderr.flush()


def _print_process_identifier(proc_name: str, cmd_line: str, process_names: List[str]):
    return f"process '{proc_name}' (looking for {','.join(process_names)}) with command line '{cmd_line}'"


def _process_timeout(process_handle, timeout=DEFAULT_APP_WAIT_TIME_SEC) -> bool:
    _print_log(
        f"Waiting up to {timeout} seconds to abort search on process {process_handle}..."
    )
    timeout_remaining = timeout
    while (
        timeout_remaining > 0 and not processes[process_handle]["text_found"].is_set()
    ):
        time.sleep(1)
        timeout_remaining -= 1
    if not processes[process_handle]["text_found"].is_set():
        processes[process_handle]["timed_out"].set()


def _random_string(length: int) -> str:
    """Naive random string generator to create process identifiers."""
    return "".join(random.choice(string.ascii_lowercase) for _ in range(length))


def wait_for_process_output(
    process, text: str, timeout=DEFAULT_APP_WAIT_TIME_SEC
) -> None:
    """This function checks if the given text is in the process output within the given time limit."""
    start_time = time.time()

    process_handle = _random_string(10)
    processes[process_handle] = {
        "text_found": threading.Event(),
        "timed_out": threading.Event(),
    }

    # start a new thread to stop searching after the timeout
    threading.Thread(target=_process_timeout, args=(process_handle, timeout)).start()
    # search for the text in the stdout and stderr streams
    threading.Thread(
        target=_find_in_stream, args=(process.stdout, text, process_handle)
    ).start()
    threading.Thread(
        target=_find_in_stream, args=(process.stderr, text, process_handle)
    ).start()

    while True:
        if processes[process_handle]["text_found"].is_set():
            return
        if processes[process_handle]["timed_out"].is_set():
            raise Exception(
                f"Failed to find '{text}' in process output after {time.time() - start_time} seconds."
            )


def kill_process(process_names: Union[str, List[str]]) -> None:
    """Kill a Python process identified by the given name or list of names.

    There are easier ways to do this, but this is the most reliable way to kill a
    Python-run process without knowing the exact command line arguments and without
    killing the current process / test runner process (eg. nox).
    """
    if isinstance(process_names, str):
        process_names = [process_names]
    proc_name = "undefined"
    cmd_line = "undefined"
    # Kill all processes with the given name
    for proc in psutil.process_iter(attrs=["pid", "name", "cmdline"], ad_value=None):
        try:
            proc_name = proc.name()
            if proc.status() == psutil.STATUS_ZOMBIE:
                continue
            # Some apps run under their own names, some as `Python` (this also
            # depends on the OS)
            if _is_process_match(proc_name, process_names):
                print(f"Killing process with name {proc_name}...")
                proc.kill()
            elif proc_name.lower().startswith("python"):
                # drop the first argument, which is the python executable
                python_command_parts = proc.cmdline()[1:]
                # the initial command part is the last part of the path
                python_command_parts[0] = python_command_parts[0].split("/")[-1]
                # combine the remaining arguments
                command = " ".join(python_command_parts)
                print(
                    f"Evaluating process with name '{proc_name}' and command '{command}'..."
                )
                if (
                    len(cmd_line) > 1
                    and "nox" not in command # don't kill the test runner process
                    and _is_process_match(command, process_names)
                ):
                    print(
                        f"Killing process with name '{proc_name}' and command '{command}'..."
                    )
                    proc.kill()
        except psutil.ZombieProcess as zp:
            print(
                f"Failed to kill zombie process {_print_process_identifier(proc_name, cmd_line, process_names)}: {str(zp)}"
            )
        except psutil.NoSuchProcess as nsp:
            print(
                f"Failed to kill process {_print_process_identifier(proc_name, cmd_line, process_names)}: {str(nsp)}"
            )

把它们放在一起:

# test_app_runner.py

from pathlib import Path

from app_runner.app_runner import AppRunner


def main():
    print("Starting AppRunner test...")
    app = "test_app.py"
    app_location = Path(__file__).parent / "app"
    with AppRunner(app=app, cwd=app_location) as app_runner:
        print("Test app start detected.")
    print("AppRunner test complete.")

if __name__ == "__main__":
    main()
© www.soinside.com 2019 - 2024. All rights reserved.