我正在尝试使用gstreamer和它的python绑定循环播放视频。第一次尝试是挂钩EOS消息并为管道生成搜索消息:
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst
import time
if not Gst.init_check()[0]:
print("gstreamer initialization failed")
source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")
qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None
decoder0 = Gst.ElementFactory.make("nxvideodec", "video_decoder0")
assert decoder0 is not None
def demux0_pad_added(demux, pad):
if pad.name == 'video_0': # We expect exactly first one video stream
pad.link(decoder0.get_static_pad("sink"))
qtdemux0.connect("pad-added", demux0_pad_added)
video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None
pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(video_sink)
source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(video_sink)
######################################################
def main():
message_bus = pipeline0.get_bus()
pipeline0.set_state(Gst.State.PLAYING)
while True:
if message_bus.have_pending(): # Working without glib mainloop
message = message_bus.pop()
if message.type == Gst.MessageType.EOS: # End-Of-Stream: loop the video, seek to beginning
pipeline0.seek(1.0,
Gst.Format.TIME,
Gst.SeekFlags.FLUSH,
Gst.SeekType.SET, 0,
Gst.SeekType.NONE, 0)
elif message.type == Gst.MessageType.ERROR:
print("ERROR", message)
break
time.sleep(0.01) # Tried 0.001 - same result
if __name__ == "__main__":
main()
它实际上工作得很好,除了一件事 - 寻求开始并不是真的无缝。我可以看到微小的故障。因为视频是一个无限的动画,这个微小的故障实际上变得明显。我的第二次尝试是使用队列解码帧并挂钩EOS事件:
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst
import time
if not Gst.init_check()[0]:
print("gstreamer initialization failed")
source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")
qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None
decoder0 = Gst.ElementFactory.make("nxvideodec", "video_decoder0")
assert decoder0 is not None
def demux0_pad_added(demux, pad):
if pad.name == 'video_0': # We expect exactly first one video stream
pad.link(decoder0.get_static_pad("sink"))
qtdemux0.connect("pad-added", demux0_pad_added)
queue = Gst.ElementFactory.make("queue", "queue")
assert queue is not None
video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None
pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(queue)
pipeline0.add(video_sink)
source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(queue)
queue.link(video_sink)
######################################################
def cb_event(pad, info, *user_data):
event = info.get_event()
if event is not None and event.type == Gst.EventType.EOS:
decoder0.seek(1.0,
Gst.Format.TIME,
Gst.SeekFlags.FLUSH,
Gst.SeekType.SET, 0,
Gst.SeekType.NONE, 0)
return Gst.PadProbeReturn.DROP
return Gst.PadProbeReturn.PASS
def main():
dec0_src_pad = decoder0.get_static_pad("src")
dec0_src_pad.add_probe(Gst.PadProbeType.BLOCK | Gst.PadProbeType.EVENT_DOWNSTREAM, cb_event)
message_bus = pipeline0.get_bus()
pipeline0.set_state(Gst.State.PLAYING)
while True:
# do nothing
time.sleep(1)
if __name__ == "__main__":
main()
在第一次EOS事件之后,播放暂停。我尝试了几种不同的方法:传递EOS事件,删除EOS并向解码器的源板添加偏移量,向管道本身和其他人发送搜索事件。但我无法让它发挥作用。
为了理解我还试图启用调试模式,并使用pad探测器编写我自己的管道活动记录器。调试模式不是很有用,日志非常笨重,缺少一些细节。我自己的日志包括上游/下游事件和缓冲时间信息。但是,我仍然无法理解错误以及如何使其发挥作用。
显然,我不仅缺少一些东西,而且还不了解gstreamer管道如何工作的一些基本要素。
所以,问题是:我应该怎么做第二版代码才能使它工作? 附加问题:是否有一些工具或技术可以清楚地了解管道内部及其包含的元素?
我将非常感谢详细的答案。对我来说,理解我做错了什么比让程序工作更重要。
附:程序在NanoPi S2板上的GNU / Linux下运行。视频存储在MP4容器中(无音频)并使用h264进行压缩。请随意以任何语言发布代码示例,不一定是Python。
哦,那好吧。我没有得到答案所以我继续研究,最后找到了解决方案。下面我将展示两种不同的方法。首先 - 用工作代码示例直接回答问题。第二种 - 不同的方法,对gstreamer而言似乎更为原生,绝对更简单。两者都给出了理想的结果 - 无缝视频循环
变化:
FLUSH
事件(连续流不应该有FLUSH
事件)。码:
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst
import time
import threading
if not Gst.init_check()[0]:
print("gstreamer initialization failed")
source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")
qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None
decoder0 = Gst.ElementFactory.make("avdec_h264", "video_decoder0")
assert decoder0 is not None
def demux0_pad_added(demux, pad):
if pad.name == 'video_0': # We expect exactly first one video stream
pad.link(decoder0.get_static_pad("sink"))
qtdemux0.connect("pad-added", demux0_pad_added)
queue = Gst.ElementFactory.make("queue", "queue")
assert queue is not None
video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None
pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(queue)
pipeline0.add(video_sink)
source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(queue)
queue.link(video_sink)
# UPD: Get video duration
pipeline0.set_state(Gst.State.PAUSED)
assert pipeline0.get_state(Gst.CLOCK_TIME_NONE).state == Gst.State.PAUSED
duration_ok, duration = pipeline0.query_duration(Gst.Format.TIME)
assert duration_ok
######################################################
seek_requested = threading.Event()
# UPD: Seek thread. Wait for seek request from callback and generate seek event
def seek_thread_func(queue_sink_pad):
cumulative_offset = 0
while True:
seek_requested.wait()
seek_requested.clear()
decoder0.seek(1.0,
Gst.Format.TIME,
Gst.SeekFlags.FLUSH,
Gst.SeekType.SET, 0,
Gst.SeekType.NONE, 0)
# Add offset. It is important step to ensure that downstream elements will 'see' infinite contiguous stream
cumulative_offset += duration
queue_sink_pad.set_offset(cumulative_offset)
def cb_event(pad, info):
event = info.get_event()
if event is not None:
if event.type == Gst.EventType.EOS: # UPD: Set 'seek_requested' flag
seek_requested.set()
return Gst.PadProbeReturn.DROP
elif event.type == Gst.EventType.FLUSH_START or event.type == Gst.EventType.FLUSH_STOP: # UPD: Drop FLUSH
return Gst.PadProbeReturn.DROP
return Gst.PadProbeReturn.OK
def main():
queue_sink_pad = queue.get_static_pad("sink")
# UPD: Create separate 'seek thread'
threading.Thread(target=seek_thread_func, daemon=True, args=(queue_sink_pad,)).start()
dec0_src_pad = decoder0.get_static_pad("src")
dec0_src_pad.add_probe(Gst.PadProbeType.EVENT_DOWNSTREAM | Gst.PadProbeType.EVENT_FLUSH,
cb_event)
pipeline0.set_state(Gst.State.PLAYING)
while True:
# do nothing
time.sleep(1)
if __name__ == "__main__":
main()
这段代码有效。当队列中的缓冲区仍在播放时,可以有效地执行搜索。但是,我相信它可能包含一些缺陷甚至是错误。例如,SEGMENT
事件通过下游传递RESET
旗;这似乎不对。实现这种方法的更清晰(可能更正确/可靠)的方法是创建一个gstreamer插件。插件将管理事件并调整事件和缓冲区的时间戳。
但是有一个更简单和原生的解决方案:
SEGMENT_DONE
message段搜索(使用
GST_SEEK_FLAG_SEGMENT
)不会在播放段结束时发出EOS
,但会在总线上发布SEGMENT_DONE
消息。此消息由驱动管道中的回放的元素发布,通常是分路器。在收到消息后,应用程序可以重新连接管道或在管道中发出其他搜索事件。由于消息是在管道中尽早发布的,因此应用程序有一些时间发布新的搜索以使转换无缝。通常,允许的延迟由接收器的缓冲区大小以及管道中任何队列的大小来定义。
消息SEGMENT_DONE
确实是在队列变空之前发布的。这提供了足够的时间来执行下一次搜索。所以我们需要做的就是在播放的最开始时发出片段搜索。然后等待SEGMENT_DONE
消息并发送下一个非刷新搜索事件。这是工作示例:
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst
import time
if not Gst.init_check()[0]:
print("gstreamer initialization failed")
source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")
qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None
decoder0 = Gst.ElementFactory.make("nxvideodec", "video_decoder0")
assert decoder0 is not None
def demux0_pad_added(demux, pad):
if pad.name == 'video_0': # We expect exactly first one video stream
pad.link(decoder0.get_static_pad("sink"))
qtdemux0.connect("pad-added", demux0_pad_added)
queue = Gst.ElementFactory.make("queue", "queue")
assert queue is not None
video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None
pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(queue)
pipeline0.add(video_sink)
source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(queue)
queue.link(video_sink)
######################################################
def main():
message_bus = pipeline0.get_bus()
pipeline0.set_state(Gst.State.PLAYING)
pipeline0.get_state(Gst.CLOCK_TIME_NONE)
pipeline0.seek(1.0,
Gst.Format.TIME,
Gst.SeekFlags.SEGMENT,
Gst.SeekType.SET, 0,
Gst.SeekType.NONE, 0)
while True:
if message_bus.have_pending(): # Working without glib mainloop
message = message_bus.pop()
if message.type == Gst.MessageType.SEGMENT_DONE:
pipeline0.seek(1.0,
Gst.Format.TIME,
Gst.SeekFlags.SEGMENT,
Gst.SeekType.SET, 0,
Gst.SeekType.NONE, 0)
elif message.type == Gst.MessageType.ERROR:
print("bus ERROR", message)
break
time.sleep(0.01)
if __name__ == "__main__":
main()
使用默认队列配置,SEGMENT_DONE
消息比最后一个视频帧播放时间提前约1秒发布。非刷新搜索确保不会丢失任何帧。这样可以提供完美的结果 - 真正无缝的视频循环。
注意:我将管道切换到PLAYING状态,然后执行初始非刷新搜索。或者,我们可以将管道切换到PAUSED状态,执行冲洗段搜索,然后将管道切换到PLAYING状态。
注2:不同来源表明解决方案略有不同。见下面的链接。
相关主题和来源:
http://gstreamer-devel.966125.n4.nabble.com/Flushing-the-data-in-partial-pipeline-tp4681893p4681899.html
https://cgit.freedesktop.org/gstreamer/gst-editing-services/tree/plugins/nle/nlesource.c
我建议看看gst-play-1.0
应用程序以及GStreamer的playbin
元素。
见这里:https://github.com/GStreamer/gst-plugins-base/blob/master/tools/gst-play.c
这个支持--gapless
选项,可以无间隙地播放许多文件。它利用了about-to-finish
元素的playbin
信号。
这个特殊的应用程序使用多个文件而不是相同的文件,但我想你可以尝试多次给同一个文件进行测试,如果它真的是无缝的或者与你的方法有相同的问题1)。
基本上我认为由于解码器的deinit / init /处理和冲洗管道,EOS因为及时准备第一帧而有点太晚了。此外,刷新将重置您的流,管道再次预卷并同步到新时钟。它真的不是来自内部的连续流。
另外,也许GStreamer Editing Services也可以这样做。但这可能适用于多个轨道,这意味着它可能会尝试同时实例化多个解码器实例以进行并行处理 - 这可能是您的电路板上的问题。
最后的办法可能是将MP4解复用到原始的bistream,将这个比特流连续循环到一个套接字并从中解码。然后它将显示为正在播放的无限比特流。
编辑:也许值得一试用它的multifilesrc
属性试用loop
,看看那个操作无间隙还是必须在文件之间执行刷新。
我正在使用SEGMENT_DONE方法:
import sys
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
Gst.init(None)
pipeline = Gst.parse_launch('uridecodebin uri=file://%s name=d d. ! autovideosink d. ! autoaudiosink' %sys.argv[1])
bus = pipeline.get_bus()
bus.add_signal_watch()
def on_segment_done(bus, msg):
pipeline.seek(1.0,
Gst.Format.TIME,
Gst.SeekFlags.SEGMENT,
Gst.SeekType.SET, 0,
Gst.SeekType.NONE, 0)
return True
bus.connect('message::segment-done', on_segment_done)
pipeline.set_state(Gst.State.PLAYING)
pipeline.get_state(Gst.CLOCK_TIME_NONE)
pipeline.seek(1.0,
Gst.Format.TIME,
Gst.SeekFlags.SEGMENT,
Gst.SeekType.SET, 0,
Gst.SeekType.NONE, 0)
GLib.MainLoop().run()