如果我调用
Gst.parse_launch
,我有一个 gstreamer 管道,当前可以正常工作:
rtspsrc tcp-timeout=<timeout> location=<location> is-live=true protocols=tcp name=mysrc "
! rtph264depay wait-for-keyframe=true request-keyframe=true "
! mpegtsmux name=mpegtsmux "
! multifilesink name=filesink next-file=max-duration max-file-duration=<duration> aggregate-gops=true post-messages=true location=<out_location>
我正在尝试将其转换为动态管道,如下所示:
def build_pipeline(self) -> str:
video_pipeline = Gst.Pipeline.new("video_pipeline")
all_data["video_pipeline"] = video_pipeline
rtsp_source = Gst.ElementFactory.make('rtspsrc', 'mysrc')
rtsp_source.set_property(...
...
all_data["mysrc"] = rtsp_source
rtph264_depay = Gst.ElementFactory.make('rtph264depay', 'rtp_depay')
rtph264_depay.set_property(....
...
all_data["rtp_depay"] = rtph264_depay
mpeg_ts_mux = Gst.ElementFactory.make('mpegtsmux', 'mpeg_mux')
all_data[mpeg_mux] = mpeg_ts_mux
multi_file_sink = Gst.ElementFactory.make('multifilesink', 'filesink')
multi_file_sink.set_property(...
...
all_data["filesink"] = multi_file_sink
video_pipeline.add(rtsp_source)
video_pipeline.add(rtph264_depay)
video_pipeline.add(mpeg_ts_mux)
video_pipeline.add(multi_file_sink)
if not rtph264_depay.link(mpeg_ts_mux):
print("Failed to link depay to mux")
else:
print("Linked depay to mux")
if not mpeg_ts_mux.link(multi_file_sink):
print("Failed to link mux to filesink")
else:
print("Linked mux to filesink")
rtsp_source.connect("pad-added", VideoStreamer._on_pad_added_callback, all_pipeline_data)
return video_pipeline
我像这样定义我的填充添加回调:
@staticmethod
def _on_pad_added_callback(rtsp_source: Gst.Element, new_pad: Gst.Pad, *user_data) -> None:
def _check_if_video_pad(pad: Gst.Pad):
current_caps = pad.get_current_caps()
for cap_index in range(current_caps.get_size()):
current_structure = current_caps.get_structure(cap_index)
media_type = current_structure.get_string("media")
if media_type == "video":
return True
return False
if not new_pad.get_name().startswith("recv_rtp_src"):
logger.info(f"Ignoring pad with name {new_pad.get_name()}")
return
if new_pad.is_linked():
logger.info(f"Pad with name {new_pad.get_name()} is already linked")
return
# Right now I only care about grabbing video, in the future I want to differentiate video and audio pipelines
if not _check_if_video_pad(new_pad):
logger.info(f"Ignoring pad with name {new_pad.get_name()} as its not video")
return
rtp_depay_element: Gst.Element = user_data[0]["rtp_depay"]
depay_sink_pad: Gst.Pad = rtp_depay_element.get_static_pad("sink")
pad_link = new_pad.link(depay_sink_pad) # Returns <enum GST_PAD_LINK_OK of type Gst.PadLinkReturn>
除此之外我做:
class VideoStreamer(ABC, threading.Thread):
def __init__(...):
...
self._lock: Final = threading.Lock()
self._loop: GLib.MainLoop | None = None
...
def run(self) -> None:
pipeline = self.build_pipeline()
bus.add_signal_watch()
bus.connect("message", self.handle_message)
with self._lock:
pipeline.set_state(Gst.State.PLAYING)
self._loop = GLib.MainLoop()
self._loop.run()
def handle_message(self, message: Gst.Message) -> None:
if message.src.get_name() != "filesink":
return
...
问题是,当我使用
parse_launch
时,我的代码工作正常。来自文件接收器元素的消息使其变得handle_message
。
无论是使用 parse_launch 还是使用动态构造,我可以使用 Gst.debug_bin_to_dot_file
观察到 3 种状态转换,它们是:
NULL->READY
READY->PAUSED
PAUSED->PLAYING
可视化管道后,我注意到以下差异:
Parse Launch:
GstRTSPSrc(GstRtpBin(GstRtpSession rtpsession0 -> GstRtpStorage rtpstorage0 -> GstRtpSsrcDemux rtpssrcdemux0 -> GstRtpJitterBuffer rtpjitterbuffer1 -> GstRtpPtDemux rtpptdemux1))->recv_rtp_src_0_... -> GstRtpH264Depay rtph264depay0`.
Dynamic Pipeline:
`GstRTSPSrc(GstRtpBin(GstRtpSession rtpsession0 -> GstRtpStorage rtpstorage0 -> GstRtpSsrcDemux rtpssrcdemux0 -> GstRtpJitterBuffer rtpjitterbuffer1 -> GstRtpPtDemux rtpptdemux1)).
看起来
GstRtpPtDemux
元素没有获得 src
pad 用于输出,只有动态输出中的接收器。
通过新的动态构造,我可以处理状态更改的消息,并且可以验证管道是否已启动,状态更改从就绪到暂停再到播放,但是我从未从文件接收器收到任何消息。我是否缺少链接或错误地链接了打击垫?
根据@SeB的建议,我将我的
_on_pad_added_callback
函数修改为:
@staticmethod
def _on_pad_added_callback(rtsp_source: Gst.Element, new_pad: Gst.Pad, *user_data) -> None:
def _check_if_video_pad(pad: Gst.Pad):
current_caps = pad.get_current_caps()
for cap_index in range(current_caps.get_size()):
current_structure = current_caps.get_structure(cap_index)
media_type = current_structure.get_string("media")
if media_type == "video":
return True
return False
if not new_pad.get_name().startswith("recv_rtp_src"):
logger.info(f"Ignoring pad with name {new_pad.get_name()}")
return
if new_pad.is_linked():
logger.info(f"Pad with name {new_pad.get_name()} is already linked")
return
# Right now I only care about grabbing video, in the future I want to differentiate video and audio pipelines
if not _check_if_video_pad(new_pad):
logger.info(f"Ignoring pad with name {new_pad.get_name()} as its not video")
return
rtp_depay_element: Gst.Element = user_data[0][_RTP_DEPAY]
filter_cap: Gst.Cap = Gst.caps_from_string("application/x-rtp, media=video")
if not Gst.Element.link_filtered(rtsp_source, rtp_depay_element, filter_cap):
print("Link failed")
else:
print("Link worked")
return
最后我需要设置
multi_file_sink.set_property("post-messages", "true")
,因为我原来的管道声明中也缺少该值。