GStreamer:在 ctrl+c 上正确发送 EOS 事件

问题描述 投票:0回答:1

我可以通过以下命令成功地从 videotestsrc 录制视频:

gst-launch-1.0 -e videotestsrc ! videoconvert ! x264enc ! mp4mux ! filesink location=output.mp4

当我运行上面的命令并按 ctrl+c 时,我会得到一个可播放的

output.mp4
文件。

我想通过 Python Gst 绑定复制此行为。但是,我无法正确处理 ctrl+c 行为

import gi
from time import sleep

gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib  # noqa


def bus_call(bus, message, loop):
    t = message.type
    if t == Gst.MessageType.EOS:
        print("EOS")
        loop.quit()
    elif t == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        loop.quit()
    return True


def main():
    Gst.init(None)
    loop = GLib.MainLoop()

    pipeline = Gst.ElementFactory.make("pipeline", None)
    src = Gst.ElementFactory.make("videotestsrc", None)
    convert = Gst.ElementFactory.make("videoconvert", None)
    encode = Gst.ElementFactory.make("x264enc")
    mux = Gst.ElementFactory.make("mp4mux")
    sink = Gst.ElementFactory.make("filesink", None)
    sink.set_property("location", "output.mp4")

    pipeline.add(src, convert, encode, mux, sink)
    src.link(convert)
    convert.link(encode)
    encode.link(mux)
    mux.link(sink)

    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect("message", bus_call, loop)

    ret = pipeline.set_state(Gst.State.PLAYING)
    if ret == Gst.StateChangeReturn.FAILURE:
        pipeline.set_state(Gst.State.NULL)
        return

    try:
        loop.run()
    except KeyboardInterrupt:
        pipeline.send_event(Gst.Event.new_eos())
    finally:
        pipeline.set_state(Gst.State.NULL)
        loop.quit()


if __name__ == "__main__":
    main()

每当我运行程序并按 ctrl+c 时,我就无法播放

output.mp4
。我应该如何正确处理 Python 代码中的 ctrl+c 事件?

python gstreamer
1个回答
0
投票

好吧,过了一段时间我已经成功解决了这个问题。我在一个单独的线程中运行

GLib.MainLoop
并等待它在 Python 代码中的 ctrl+c 事件之后完成:

import gi
from time import sleep
from threading import Thread

gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib  # noqa


def bus_call(bus, message, loop):
    t = message.type
    if t == Gst.MessageType.EOS:
        loop.quit()
    elif t == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        loop.quit()
    return True


def main():
    Gst.init(None)
    loop = GLib.MainLoop()
    thread = Thread(target=loop.run)
    thread.start()

    pipeline = Gst.ElementFactory.make("pipeline", None)
    src = Gst.ElementFactory.make("videotestsrc", None)
    convert = Gst.ElementFactory.make("videoconvert", None)
    encode = Gst.ElementFactory.make("x264enc")
    mux = Gst.ElementFactory.make("mp4mux")
    sink = Gst.ElementFactory.make("filesink", None)
    sink.set_property("location", "output.mp4")

    pipeline.add(src, convert, encode, mux, sink)
    src.link(convert)
    convert.link(encode)
    encode.link(mux)
    mux.link(sink)

    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect("message", bus_call, loop)

    ret = pipeline.set_state(Gst.State.PLAYING)
    if ret == Gst.StateChangeReturn.FAILURE:
        pipeline.set_state(Gst.State.NULL)
        return

    try:
        while loop.is_running():
            sleep(0.01)
    except KeyboardInterrupt:
        print("interrupted")
        pipeline.send_event(Gst.Event.new_eos())
        while loop.is_running():
            sleep(0.01)
    finally:
        pipeline.set_state(Gst.State.NULL)
        loop.quit()


if __name__ == "__main__":
    main()

但是,我想知道这是否是在 Python 代码中处理我的问题的正确方法。

© www.soinside.com 2019 - 2024. All rights reserved.