我一直在尝试将 u8 vec 提供给我的应用程序,将从 RabbitMQ 读取到 appsrc 中以对其进行转码。
我正在尝试一个像下面这样的简单管道,无需解码来测试它。
gst-launch-1.0 filesrc location=dip.ts ! tsdemux name=demux demux. ! h264parse ! mpegtsmux ! filesink location=pp.ts
我编写了以下代码来构建 Rust 管道。但是我不断收到“内部数据流错误”。
fn create_pipeline() -> Result<gst::Pipeline, Error> {
gst::init()?;
let pipeline = gst::Pipeline::default();
let video_info = &gst::Caps::builder("video/x-h264")
.field("stream-format", &"byte-stream")
.build();
let appsrc = gst_app::AppSrc::builder()
.caps(video_info)
.format(gst::Format::Time)
.build();
// gst-launch-1.0 filesrc location=dip.ts ! tsdemux name=demux demux. ! h264parse ! mpegtsmux ! filesink location=pp.ts
let tsdemux = gst::ElementFactory::make("tsdemux").build()?;
let h264parse = gst::ElementFactory::make("h264parse").build()?;
let mpegtsmux = gst::ElementFactory::make("mpegtsmux").build()?;
let filesink = gst::ElementFactory::make("multifilesink").build()?;
filesink.set_property("location", "%d.ts");
filesink.set_property("post-messages", true);
filesink.set_property_from_str("next-file", "discont");
let video_sink_pad = h264parse.static_pad("sink").expect("could not get sink pad from h264parse");
pipeline.add_many(&[appsrc.upcast_ref(), &tsdemux, h264parse.upcast_ref(), &mpegtsmux, &filesink])?;
gst::Element::link_many(&[appsrc.upcast_ref(), &tsdemux])?;
tsdemux.connect_pad_added(move |_src, src_pad| {
let is_video = if src_pad.name().starts_with("video") {
true
} else {
false
};
let connect_demux = || -> Result<(), Error> {
src_pad.link(&video_sink_pad).expect("failed to link tsdemux.video->h264parse.sink");
println!("linked tsdemux->h264parse");
Ok(())
};
if is_video {
match connect_demux() {
Ok(_) => println!("tsdemux->h264 connected"),
Err(e) => println!("could not connect tsdemux->h264parse e:{}", e),
}
}
});
gst::Element::link_many(&[&h264parse.upcast_ref(), &mpegtsmux, &filesink])?;
appsrc.set_callbacks(
gst_app::AppSrcCallbacks::builder()
.need_data(move |appsrc, _| {
println!("getting the file");
let mut file = File::open("./dip.ts").unwrap();
let mut buffer = Vec::new();
match file.read_to_end(&mut buffer) {
Ok(_) => {
println!("finished reading bytes from file len={}", buffer.len());
}
Err(e) => {
println!("error reading file: {}", e);
}
};
let gst_buffer = gst::Buffer::from_slice(buffer);
println!("buffer size of teh generated gst_buffer={}", gst_buffer.size());
let _ = appsrc.push_buffer(gst_buffer);
}).build(),
);
Ok(pipeline)
}
是因为appsrc无法处理缓冲区大小吗?如果是这样,我如何将它分成字节流中的块?
谢谢@弗洛里安。你是对的。将上限更改为 video/mpegts 并将其设置为
systemstream
解决了该问题。
为了他人的利益,这里是更新的管道
fn create_pipeline() -> Result<gst::Pipeline, Error> {
gst::init()?;
let pipeline = gst::Pipeline::default();
let video_info = &gst::Caps::builder("video/mpegts")
.field("systemstream", true)
.build();
let appsrc = gst_app::AppSrc::builder()
.caps(video_info)
.format(gst::Format::Time)
.build();
// gst-launch-1.0 filesrc location=dip.ts ! tsdemux name=demux demux. ! h264parse ! mpegtsmux ! filesink location=pp.ts
let tsdemux = gst::ElementFactory::make("tsdemux").build()?;
let h264parse = gst::ElementFactory::make("h264parse").build()?;
let mpegtsmux = gst::ElementFactory::make("mpegtsmux").build()?;
let filesink = gst::ElementFactory::make("multifilesink").build()?;
filesink.set_property("location", "%d.ts");
filesink.set_property("post-messages", true);
filesink.set_property_from_str("next-file", "discont");
let video_sink_pad = h264parse.static_pad("sink").expect("could not get sink pad from h264parse");
pipeline.add_many(&[appsrc.upcast_ref(), &tsdemux, h264parse.upcast_ref(), &mpegtsmux, &filesink])?;
gst::Element::link_many(&[appsrc.upcast_ref(), &tsdemux])?;
tsdemux.connect_pad_added(move |_src, src_pad| {
let is_video = if src_pad.name().starts_with("video") {
true
} else {
false
};
let connect_demux = || -> Result<(), Error> {
src_pad.link(&video_sink_pad).expect("failed to link tsdemux.video->h264parse.sink");
println!("linked tsdemux->h264parse");
Ok(())
};
if is_video {
match connect_demux() {
Ok(_) => println!("tsdemux->h264 connected"),
Err(e) => println!("could not connect tsdemux->h264parse e:{}", e),
}
}
});
gst::Element::link_many(&[&h264parse.upcast_ref(), &mpegtsmux, &filesink])?;
appsrc.set_callbacks(
gst_app::AppSrcCallbacks::builder()
.need_data(move |appsrc, _| {
println!("getting the file");
let mut file = File::open("./dip.ts").unwrap();
let mut buffer = Vec::new();
match file.read_to_end(&mut buffer) {
Ok(_) => {
println!("finished reading bytes from file len={}", buffer.len());
}
Err(e) => {
println!("error reading file: {}", e);
}
};
let gst_buffer = gst::Buffer::from_slice(buffer);
println!("buffer size of teh generated gst_buffer={}", gst_buffer.size());
let _ = appsrc.push_buffer(gst_buffer);
}).build(),
);
Ok(pipeline)
}