GStreamer Python 动态管道构建不起作用

问题描述 投票:0回答:1

如果我调用

Gst.parse_launch
,我有一个 gstreamer 管道,当前可以正常工作:

rtspsrc tcp-timeout=<timeout> location=<location> is-live=true protocols=tcp name=mysrc "
! rtph264depay wait-for-keyframe=true request-keyframe=true "
! mpegtsmux name=mpegtsmux "
! multifilesink name=filesink next-file=max-duration max-file-duration=<duration> aggregate-gops=true post-messages=true location=<out_location>

我正在尝试将其转换为动态管道,如下所示:

def build_pipeline(self) -> str:
    video_pipeline = Gst.Pipeline.new("video_pipeline")
    all_data["video_pipeline"] = video_pipeline
    rtsp_source = Gst.ElementFactory.make('rtspsrc', 'mysrc')
    rtsp_source.set_property(...
    ...
    all_data["mysrc"] = rtsp_source

    rtph264_depay = Gst.ElementFactory.make('rtph264depay', 'rtp_depay')
    rtph264_depay.set_property(....
    ...
    all_data["rtp_depay"] = rtph264_depay
    
    mpeg_ts_mux = Gst.ElementFactory.make('mpegtsmux', 'mpeg_mux')
    all_data[mpeg_mux] = mpeg_ts_mux

    multi_file_sink = Gst.ElementFactory.make('multifilesink', 'filesink')
    multi_file_sink.set_property(...
    ...
    all_data["filesink"] = multi_file_sink

    video_pipeline.add(rtsp_source)
    video_pipeline.add(rtph264_depay)
    video_pipeline.add(mpeg_ts_mux)
    video_pipeline.add(multi_file_sink)
    if not rtph264_depay.link(mpeg_ts_mux): 
        print("Failed to link depay to mux")
    else:
        print("Linked depay to mux")
    if not mpeg_ts_mux.link(multi_file_sink): 
        print("Failed to link mux to filesink")
    else:
        print("Linked mux to filesink")
    rtsp_source.connect("pad-added", VideoStreamer._on_pad_added_callback, all_pipeline_data)
    return video_pipeline 

我像这样定义我的填充添加回调:

    @staticmethod
    def _on_pad_added_callback(rtsp_source: Gst.Element, new_pad: Gst.Pad, *user_data) -> None:
        def _check_if_video_pad(pad: Gst.Pad):
            current_caps = pad.get_current_caps()
            for cap_index in range(current_caps.get_size()):
                current_structure = current_caps.get_structure(cap_index)
                media_type = current_structure.get_string("media")
                if media_type == "video":
                    return True
            return False
     if not new_pad.get_name().startswith("recv_rtp_src"):
          logger.info(f"Ignoring pad with name {new_pad.get_name()}")
          return
     if new_pad.is_linked():
          logger.info(f"Pad with name {new_pad.get_name()} is already linked")
          return
     # Right now I only care about grabbing video, in the future I want to differentiate video and audio pipelines
     if not _check_if_video_pad(new_pad):
          logger.info(f"Ignoring pad with name {new_pad.get_name()} as its not video")
          return
     
     rtp_depay_element: Gst.Element = user_data[0]["rtp_depay"]
     depay_sink_pad: Gst.Pad = rtp_depay_element.get_static_pad("sink")
     pad_link = new_pad.link(depay_sink_pad) # Returns <enum GST_PAD_LINK_OK of type Gst.PadLinkReturn>

除此之外我做:

class VideoStreamer(ABC, threading.Thread):
    def __init__(...):
        ...
        self._lock: Final = threading.Lock()
        self._loop: GLib.MainLoop | None = None
        ...
    def run(self) -> None:
        pipeline = self.build_pipeline()
        bus.add_signal_watch()
        bus.connect("message", self.handle_message)
        with self._lock:
            pipeline.set_state(Gst.State.PLAYING)
            self._loop = GLib.MainLoop()
            self._loop.run()
        
    def handle_message(self, message: Gst.Message) -> None:
        if message.src.get_name() != "filesink":
             return
        ...

问题是,当我使用

parse_launch
时,我的代码工作正常。来自文件接收器元素的消息使其变得
handle_message
。 无论是使用 parse_launch 还是使用动态构造,我可以使用
Gst.debug_bin_to_dot_file
观察到 3 种状态转换,它们是:

NULL->READY
READY->PAUSED
PAUSED->PLAYING

可视化管道后,我注意到以下差异:

Parse Launch:
GstRTSPSrc(GstRtpBin(GstRtpSession rtpsession0 -> GstRtpStorage rtpstorage0 -> GstRtpSsrcDemux rtpssrcdemux0 -> GstRtpJitterBuffer rtpjitterbuffer1 -> GstRtpPtDemux rtpptdemux1))->recv_rtp_src_0_... -> GstRtpH264Depay rtph264depay0`.

Dynamic Pipeline:

`GstRTSPSrc(GstRtpBin(GstRtpSession rtpsession0 -> GstRtpStorage rtpstorage0 -> GstRtpSsrcDemux rtpssrcdemux0 -> GstRtpJitterBuffer rtpjitterbuffer1 -> GstRtpPtDemux rtpptdemux1)).

看起来

GstRtpPtDemux
元素没有获得
src
pad 用于输出,只有动态输出中的接收器。 通过新的动态构造,我可以处理状态更改的消息,并且可以验证管道是否已启动,状态更改从就绪到暂停再到播放,但是我从未从文件接收器收到任何消息。我是否缺少链接或错误地链接了打击垫?

python gstreamer
1个回答
0
投票

根据@SeB的建议,我将我的

_on_pad_added_callback
函数修改为:

@staticmethod
def _on_pad_added_callback(rtsp_source: Gst.Element, new_pad: Gst.Pad, *user_data) -> None:
    def _check_if_video_pad(pad: Gst.Pad):
        current_caps = pad.get_current_caps()
        for cap_index in range(current_caps.get_size()):
            current_structure = current_caps.get_structure(cap_index)
            media_type = current_structure.get_string("media")
            if media_type == "video":
                return True
            return False
    
    if not new_pad.get_name().startswith("recv_rtp_src"):
        logger.info(f"Ignoring pad with name {new_pad.get_name()}")
        return
    if new_pad.is_linked():
        logger.info(f"Pad with name {new_pad.get_name()} is already linked")
        return
    # Right now I only care about grabbing video, in the future I want to differentiate video and audio pipelines
    if not _check_if_video_pad(new_pad):
        logger.info(f"Ignoring pad with name {new_pad.get_name()} as its not video")
        return
     
    rtp_depay_element: Gst.Element = user_data[0][_RTP_DEPAY]
    filter_cap: Gst.Cap = Gst.caps_from_string("application/x-rtp, media=video")
    if not Gst.Element.link_filtered(rtsp_source, rtp_depay_element, filter_cap):
        print("Link failed")
    else:
        print("Link worked")
    return

最后我需要设置

multi_file_sink.set_property("post-messages", "true")
,因为我原来的管道声明中也缺少该值。

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.