我可以通过以下命令成功地从 videotestsrc 录制视频:
gst-launch-1.0 -e videotestsrc ! videoconvert ! x264enc ! mp4mux ! filesink location=output.mp4
当我运行上面的命令并按 ctrl+c 时,我会得到一个可播放的
output.mp4
文件。
我想通过 Python Gst 绑定复制此行为。但是,我无法正确处理 ctrl+c 行为
import gi
from time import sleep
gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib # noqa
def bus_call(bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
print("EOS")
loop.quit()
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
loop.quit()
return True
def main():
Gst.init(None)
loop = GLib.MainLoop()
pipeline = Gst.ElementFactory.make("pipeline", None)
src = Gst.ElementFactory.make("videotestsrc", None)
convert = Gst.ElementFactory.make("videoconvert", None)
encode = Gst.ElementFactory.make("x264enc")
mux = Gst.ElementFactory.make("mp4mux")
sink = Gst.ElementFactory.make("filesink", None)
sink.set_property("location", "output.mp4")
pipeline.add(src, convert, encode, mux, sink)
src.link(convert)
convert.link(encode)
encode.link(mux)
mux.link(sink)
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
pipeline.set_state(Gst.State.NULL)
return
try:
loop.run()
except KeyboardInterrupt:
pipeline.send_event(Gst.Event.new_eos())
finally:
pipeline.set_state(Gst.State.NULL)
loop.quit()
if __name__ == "__main__":
main()
每当我运行程序并按 ctrl+c 时,我就无法播放
output.mp4
。我应该如何正确处理 Python 代码中的 ctrl+c 事件?
好吧,过了一段时间我已经成功解决了这个问题。我在一个单独的线程中运行
GLib.MainLoop
并等待它在 Python 代码中的 ctrl+c 事件之后完成:
import gi
from time import sleep
from threading import Thread
gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib # noqa
def bus_call(bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
loop.quit()
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
loop.quit()
return True
def main():
Gst.init(None)
loop = GLib.MainLoop()
thread = Thread(target=loop.run)
thread.start()
pipeline = Gst.ElementFactory.make("pipeline", None)
src = Gst.ElementFactory.make("videotestsrc", None)
convert = Gst.ElementFactory.make("videoconvert", None)
encode = Gst.ElementFactory.make("x264enc")
mux = Gst.ElementFactory.make("mp4mux")
sink = Gst.ElementFactory.make("filesink", None)
sink.set_property("location", "output.mp4")
pipeline.add(src, convert, encode, mux, sink)
src.link(convert)
convert.link(encode)
encode.link(mux)
mux.link(sink)
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
pipeline.set_state(Gst.State.NULL)
return
try:
while loop.is_running():
sleep(0.01)
except KeyboardInterrupt:
print("interrupted")
pipeline.send_event(Gst.Event.new_eos())
while loop.is_running():
sleep(0.01)
finally:
pipeline.set_state(Gst.State.NULL)
loop.quit()
if __name__ == "__main__":
main()
但是,我想知道这是否是在 Python 代码中处理我的问题的正确方法。