在gstreamer的无缝的录影圈

问题描述 投票:0回答:3

我正在尝试使用gstreamer和它的python绑定循环播放视频。第一次尝试是挂钩EOS消息并为管道生成搜索消息:

import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst

import time

if not Gst.init_check()[0]:
    print("gstreamer initialization failed")

source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")

qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None

decoder0 = Gst.ElementFactory.make("nxvideodec", "video_decoder0")
assert decoder0 is not None

def demux0_pad_added(demux, pad):
    if pad.name == 'video_0':  # We expect exactly first one video stream
        pad.link(decoder0.get_static_pad("sink"))

qtdemux0.connect("pad-added", demux0_pad_added)

video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None

pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(video_sink)

source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(video_sink)

######################################################

def main():
    message_bus = pipeline0.get_bus()
    pipeline0.set_state(Gst.State.PLAYING)

    while True:
        if message_bus.have_pending():  # Working without glib mainloop
            message = message_bus.pop()
            if message.type == Gst.MessageType.EOS:  # End-Of-Stream: loop the video, seek to beginning
                pipeline0.seek(1.0,
                              Gst.Format.TIME,
                              Gst.SeekFlags.FLUSH,
                              Gst.SeekType.SET, 0,
                              Gst.SeekType.NONE, 0)
            elif message.type == Gst.MessageType.ERROR:
                print("ERROR", message)
                break
        time.sleep(0.01) # Tried 0.001 - same result

if __name__ == "__main__":
    main()

它实际上工作得很好,除了一件事 - 寻求开始并不是真的无缝。我可以看到微小的故障。因为视频是一个无限的动画,这个微小的故障实际上变得明显。我的第二次尝试是使用队列解码帧并挂钩EOS事件:

import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst

import time

if not Gst.init_check()[0]:
    print("gstreamer initialization failed")

source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")

qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None

decoder0 = Gst.ElementFactory.make("nxvideodec", "video_decoder0")
assert decoder0 is not None

def demux0_pad_added(demux, pad):
    if pad.name == 'video_0':  # We expect exactly first one video stream
        pad.link(decoder0.get_static_pad("sink"))

qtdemux0.connect("pad-added", demux0_pad_added)

queue = Gst.ElementFactory.make("queue", "queue")
assert queue is not None
video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None

pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(queue)
pipeline0.add(video_sink)

source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(queue)
queue.link(video_sink)

######################################################

def cb_event(pad, info, *user_data):
    event = info.get_event()
    if event is not None and event.type == Gst.EventType.EOS:
        decoder0.seek(1.0,
                      Gst.Format.TIME,
                      Gst.SeekFlags.FLUSH,
                      Gst.SeekType.SET, 0,
                      Gst.SeekType.NONE, 0)
        return Gst.PadProbeReturn.DROP
    return Gst.PadProbeReturn.PASS

def main():
    dec0_src_pad = decoder0.get_static_pad("src")
    dec0_src_pad.add_probe(Gst.PadProbeType.BLOCK | Gst.PadProbeType.EVENT_DOWNSTREAM, cb_event)

    message_bus = pipeline0.get_bus()
    pipeline0.set_state(Gst.State.PLAYING)

    while True:
        # do nothing
        time.sleep(1)

if __name__ == "__main__":
    main()

在第一次EOS事件之后,播放暂停。我尝试了几种不同的方法:传递EOS事件,删除EOS并向解码器的源板添加偏移量,向管道本身和其他人发送搜索事件。但我无法让它发挥作用。

为了理解我还试图启用调试模式,并使用pad探测器编写我自己的管道活动记录器。调试模式不是很有用,日志非常笨重,缺少一些细节。我自己的日志包括上游/下游事件和缓冲时间信息。但是,我仍然无法理解错误以及如何使其发挥作用。

显然,我不仅缺少一些东西,而且还不了解gstreamer管道如何工作的一些基本要素。

所以,问题是:我应该怎么做第二版代码才能使它工作? 附加问题:是否有一些工具或技术可以清楚地了解管道内部及其包含的元素?

我将非常感谢详细的答案。对我来说,理解我做错了什么比让程序工作更重要。

附:程序在NanoPi S2板上的GNU / Linux下运行。视频存储在MP4容器中(无音频)并使用h264进行压缩。请随意以任何语言发布代码示例,不一定是Python。

gstreamer python-gstreamer
3个回答
2
投票

哦,那好吧。我没有得到答案所以我继续研究,最后找到了解决方案。下面我将展示两种不同的方法。首先 - 用工作代码示例直接回答问题。第二种 - 不同的方法,对gstreamer而言似乎更为原生,绝对更简单。两者都给出了理想的结果 - 无缝视频循环

Corrected code (the answer, but not the best approach)

变化:

  1. 添加了视频时长查询。每个循环我们应该增加视频持续时间值的时间偏移。它可以模拟无限连续的流。
  2. 搜索事件发送移动到单独的线程。根据this post,我们不能从流线程中发出搜索事件。另外,看看this file(来自上述帖子的链接)。
  3. 事件回调现在丢弃FLUSH事件(连续流不应该有FLUSH事件)。
  4. 视频解码器从nxvideodec更改为avdec_h264。这与最初的问题无关,并且完成了for a very special reason

码:

import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst

import time
import threading

if not Gst.init_check()[0]:
    print("gstreamer initialization failed")

source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")

qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None

decoder0 = Gst.ElementFactory.make("avdec_h264", "video_decoder0")
assert decoder0 is not None

def demux0_pad_added(demux, pad):
    if pad.name == 'video_0':  # We expect exactly first one video stream
        pad.link(decoder0.get_static_pad("sink"))

qtdemux0.connect("pad-added", demux0_pad_added)

queue = Gst.ElementFactory.make("queue", "queue")
assert queue is not None
video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None

pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(queue)
pipeline0.add(video_sink)

source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(queue)
queue.link(video_sink)

# UPD: Get video duration
pipeline0.set_state(Gst.State.PAUSED)
assert pipeline0.get_state(Gst.CLOCK_TIME_NONE).state == Gst.State.PAUSED
duration_ok, duration = pipeline0.query_duration(Gst.Format.TIME)
assert duration_ok

######################################################

seek_requested = threading.Event()
# UPD: Seek thread. Wait for seek request from callback and generate seek event
def seek_thread_func(queue_sink_pad):
    cumulative_offset = 0
    while True:
        seek_requested.wait()
        seek_requested.clear()
        decoder0.seek(1.0,
                      Gst.Format.TIME,
                      Gst.SeekFlags.FLUSH,
                      Gst.SeekType.SET, 0,
                      Gst.SeekType.NONE, 0)
        # Add offset. It is important step to ensure that downstream elements will 'see' infinite contiguous stream
        cumulative_offset += duration
        queue_sink_pad.set_offset(cumulative_offset)

def cb_event(pad, info):
    event = info.get_event()
    if event is not None:
        if event.type == Gst.EventType.EOS:  # UPD: Set 'seek_requested' flag
            seek_requested.set()
            return Gst.PadProbeReturn.DROP
        elif event.type == Gst.EventType.FLUSH_START or event.type == Gst.EventType.FLUSH_STOP:  # UPD: Drop FLUSH
            return Gst.PadProbeReturn.DROP
    return Gst.PadProbeReturn.OK

def main():
    queue_sink_pad = queue.get_static_pad("sink")

    # UPD: Create separate 'seek thread'
    threading.Thread(target=seek_thread_func, daemon=True, args=(queue_sink_pad,)).start()

    dec0_src_pad = decoder0.get_static_pad("src")
    dec0_src_pad.add_probe(Gst.PadProbeType.EVENT_DOWNSTREAM | Gst.PadProbeType.EVENT_FLUSH,
                           cb_event)

    pipeline0.set_state(Gst.State.PLAYING)

    while True:
        # do nothing
        time.sleep(1)

if __name__ == "__main__":
    main()

这段代码有效。当队列中的缓冲区仍在播放时,可以有效地执行搜索。但是,我相信它可能包含一些缺陷甚至是错误。例如,SEGMENT事件通过下游传递RESET旗;这似乎不对。实现这种方法的更清晰(可能更正确/可靠)的方法是创建一个gstreamer插件。插件将管理事件并调整事件和缓冲区的时间戳。

但是有一个更简单和原生的解决方案:

Using segment seek and SEGMENT_DONE message

根据documentation

段搜索(使用GST_SEEK_FLAG_SEGMENT)不会在播放段结束时发出EOS,但会在总线上发布SEGMENT_DONE消息。此消息由驱动管道中的回放的元素发布,通常是分路器。在收到消息后,应用程序可以重新连接管道或在管道中发出其他搜索事件。由于消息是在管道中尽早发布的,因此应用程序有一些时间发布新的搜索以使转换无缝。通常,允许的延迟由接收器的缓冲区大小以及管道中任何队列的大小来定义。

消息SEGMENT_DONE确实是在队列变空之前发布的。这提供了足够的时间来执行下一次搜索。所以我们需要做的就是在播放的最开始时发出片段搜索。然后等待SEGMENT_DONE消息并发送下一个非刷新搜索事件。这是工作示例:

import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst

import time

if not Gst.init_check()[0]:
    print("gstreamer initialization failed")

source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")

qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None

decoder0 = Gst.ElementFactory.make("nxvideodec", "video_decoder0")
assert decoder0 is not None

def demux0_pad_added(demux, pad):
    if pad.name == 'video_0':  # We expect exactly first one video stream
        pad.link(decoder0.get_static_pad("sink"))

qtdemux0.connect("pad-added", demux0_pad_added)

queue = Gst.ElementFactory.make("queue", "queue")
assert queue is not None
video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None

pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(queue)
pipeline0.add(video_sink)

source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(queue)
queue.link(video_sink)

######################################################

def main():
    message_bus = pipeline0.get_bus()
    pipeline0.set_state(Gst.State.PLAYING)
    pipeline0.get_state(Gst.CLOCK_TIME_NONE)
    pipeline0.seek(1.0,
                   Gst.Format.TIME,
                   Gst.SeekFlags.SEGMENT,
                   Gst.SeekType.SET, 0,
                   Gst.SeekType.NONE, 0)

    while True:
        if message_bus.have_pending():  # Working without glib mainloop
            message = message_bus.pop()
            if message.type == Gst.MessageType.SEGMENT_DONE:
                pipeline0.seek(1.0,
                              Gst.Format.TIME,
                              Gst.SeekFlags.SEGMENT,
                              Gst.SeekType.SET, 0,
                              Gst.SeekType.NONE, 0)
            elif message.type == Gst.MessageType.ERROR:
                print("bus ERROR", message)
                break
        time.sleep(0.01)

if __name__ == "__main__":
    main()

使用默认队列配置,SEGMENT_DONE消息比最后一个视频帧播放时间提前约1秒发布。非刷新搜索确保不会丢失任何帧。这样可以提供完美的结果 - 真正无缝的视频循环。

注意:我将管道切换到PLAYING状态,然后执行初始非刷新搜索。或者,我们可以将管道切换到PAUSED状态,执行冲洗段搜索,然后将管道切换到PLAYING状态。

注2:不同来源表明解决方案略有不同。见下面的链接。


相关主题和来源:

  1. http://gstreamer-devel.966125.n4.nabble.com/Flushing-the-data-in-partial-pipeline-tp4681893p4681899.html https://cgit.freedesktop.org/gstreamer/gst-editing-services/tree/plugins/nle/nlesource.c
  2. http://gstreamer-devel.966125.n4.nabble.com/Loop-a-file-using-playbin-without-artefacts-td4671952.html http://gstreamer-devel.966125.n4.nabble.com/attachment/4671975/0/gstseamlessloop.py https://github.com/GStreamer/gst-plugins-good/blob/master/tests/icles/test-segment-seeks.c

0
投票

我建议看看gst-play-1.0应用程序以及GStreamer的playbin元素。

见这里:https://github.com/GStreamer/gst-plugins-base/blob/master/tools/gst-play.c

这个支持--gapless选项,可以无间隙地播放许多文件。它利用了about-to-finish元素的playbin信号。

这个特殊的应用程序使用多个文件而不是相同的文件,但我想你可以尝试多次给同一个文件进行测试,如果它真的是无缝的或者与你的方法有相同的问题1)。

基本上我认为由于解码器的deinit / init /处理和冲洗管道,EOS因为及时准备第一帧而有点太晚了。此外,刷新将重置您的流,管道再次预卷并同步到新时钟。它真的不是来自内部的连续流。

另外,也许GStreamer Editing Services也可以这样做。但这可能适用于多个轨道,这意味着它可能会尝试同时实例化多个解码器实例以进行并行处理 - 这可能是您的电路板上的问题。

最后的办法可能是将MP4解复用到原始的bistream,将这个比特流连续循环到一个套接字并从中解码。然后它将显示为正在播放的无限比特流。

编辑:也许值得一试用它的multifilesrc属性试用loop,看看那个操作无间隙还是必须在文件之间执行刷新。


0
投票

我正在使用SEGMENT_DONE方法:

import sys
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
Gst.init(None)

pipeline = Gst.parse_launch('uridecodebin uri=file://%s name=d d. ! autovideosink d. ! autoaudiosink' %sys.argv[1])
bus = pipeline.get_bus()
bus.add_signal_watch()

def on_segment_done(bus, msg):
    pipeline.seek(1.0,
        Gst.Format.TIME,
        Gst.SeekFlags.SEGMENT,
        Gst.SeekType.SET, 0,
        Gst.SeekType.NONE, 0)
    return True
bus.connect('message::segment-done', on_segment_done)

pipeline.set_state(Gst.State.PLAYING)
pipeline.get_state(Gst.CLOCK_TIME_NONE)
pipeline.seek(1.0,
    Gst.Format.TIME,
    Gst.SeekFlags.SEGMENT,
    Gst.SeekType.SET, 0,
    Gst.SeekType.NONE, 0)

GLib.MainLoop().run()
© www.soinside.com 2019 - 2024. All rights reserved.