Gstreamer EOS 消息处理在文件接收器中以动态更改位置

问题描述 投票:0回答:4

尝试即时切换输出文件,但无法处理 EOS。

http://gstreamer-devel.966125.n4.nabble.com/Dynamically-updating-filesink-location-at-run-time-on-the-fly-td4660569.html

引用:

假设您有一个如下所示的管道:
audiosrc --> 编码器 --> mux --> 文件接收器

那么你需要将其更改为:
audiosrc --> 编码器 --> 队列 --> muxsink_bin
其中 muxsink_bin 是一个 bin
Ghostpad --> mux --> 文件接收器

那么程序是:
1 - 使用 gst_pad_set_blocked_async() 阻止队列 srcpad
2 - 在被阻止的回调中:
2a - 使用 gst_pad_unlink() 取消链接 muxsink_bin
2b - 使用 gst_pad_send_event() 将 EOS 事件发送到 muxsink_bin 接收器垫
2b - 创建一个新的 muxsink_bin
2c - 设置文件接收器位置
2d - 使用 gst_bin_add() 将新 bin 添加到管道
2e - 使用 gst_element_sync_state_with_parent() 与父级同步
2f - 使用 gst_pad_link() 将其链接到队列 srcpad
2g - 使用 gst_pad_set_blocked_async() 解锁队列 srcpad。当发生未阻止的回调时,您将再次录音并且不会丢失任何数据。解锁回调中无需执行任何操作

3 - 处理 EOS 并删除旧的 muxsink_bin。我有一个消息处理程序,使用“gstbin_class->handle_message = GST_DEBUG_FUNCPTR(msg_handler)”安装在 bin_init() 函数中并在处理程序中:
3a - 使用 gst_element_set_locked_state() 锁定 bin 状态
3b - 使用 gst_element_set_state() 将状态设置为 NULL
3c - 使用 gst_bin_remove() 将其从管道中删除

就是这样。唯一需要注意的是数据必须流经管道才能发挥作用。

稻田

除了旧管道的最终确定之外,主序列正常工作。

困难在于第3点:我可以将EOS发送到ghostpad,并且filesink收到它。但如何抓住那个EOS呢?

“使用

gstbin_class->handle_message = GST_DEBUG_FUNCPTR(msg_handler)
安装消息处理程序”是什么意思?

gstreamer python-gstreamer
4个回答
4
投票

有消息转发。

必须在总线上启用:

g_object_set(G_OBJECT(bin), "message-forward", TRUE, 0);

处理:

case GST_MESSAGE_ELEMENT:
{
    const GstStructure *s = gst_message_get_structure (msg);

    if (gst_structure_has_name (s, "GstBinForwarded"))
    {
        GstMessage *forward_msg = NULL;

        gst_structure_get (s, "message", GST_TYPE_MESSAGE, &forward_msg, NULL);
        if (GST_MESSAGE_TYPE (forward_msg) == GST_MESSAGE_EOS)
        {
            g_print ("EOS from element %s\n",
                    GST_OBJECT_NAME (GST_MESSAGE_SRC (forward_msg)));
            DestroyBin();
            CreateNewBin();
            RemovePad();
        }
        gst_message_unref (forward_msg);
    }
}

完整代码:

#include <gst/gst.h>
#include <iostream>
#include <cstring>
#include <cstdio>
static gchar *opt_effects = NULL;

#define DEFAULT_EFFECTS "identity,exclusion,navigationtest," \
        "agingtv,videoflip,vertigotv,gaussianblur,shagadelictv,edgetv"

static GstElement *pipeline;
static GstElement * muxer;
static GstElement * sink;
static GstElement * q2;
static int i=0;
GstElement * bin;
GstPad * muxerSinkPad;

gulong probeId;

static GQueue effects = G_QUEUE_INIT;

void CreateNewBin();
void DestroyBin();
void ChangeLocation();
void RemovePad();

static GstPadProbeReturn
pad_probe_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
{
    GstPad *sinkPad = gst_element_get_static_pad(bin, "sink");
    gst_pad_unlink(pad, sinkPad);
    gst_pad_send_event(sinkPad, gst_event_new_eos());
    gst_object_unref(sinkPad);

    return GST_PAD_PROBE_OK;
}

static gboolean
timeout_cb (gpointer user_data)
{
    static int i=0;
    if(i==0)
    {
        GstPad * q2SrcPad;
        q2SrcPad = gst_element_get_static_pad(q2, "src");
        std::cout << "Timeout: " << q2SrcPad << std::endl;

        probeId = gst_pad_add_probe (q2SrcPad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
                pad_probe_cb, user_data, NULL);
        gst_object_unref(q2SrcPad);

        return TRUE;
    }
return FALSE;
}

static gboolean
bus_cb (GstBus * bus, GstMessage * msg, gpointer user_data)
{
    GMainLoop *loop = (GMainLoop*)user_data;

    switch (GST_MESSAGE_TYPE (msg)) {
    case GST_MESSAGE_ERROR:{
        GError *err = NULL;
        gchar *dbg;

        gst_message_parse_error (msg, &err, &dbg);
        gst_object_default_error (msg->src, err, dbg);
        g_error_free (err);
        g_free (dbg);
        g_main_loop_quit (loop);
        break;
    }
    case GST_EVENT_EOS:
        std::cout << "EOS message is got" << std::endl;
        break;

    case GST_MESSAGE_ELEMENT:
    {
        const GstStructure *s = gst_message_get_structure (msg);

        if (gst_structure_has_name (s, "GstBinForwarded"))
        {
            GstMessage *forward_msg = NULL;

            gst_structure_get (s, "message", GST_TYPE_MESSAGE, &forward_msg, NULL);
            if (GST_MESSAGE_TYPE (forward_msg) == GST_MESSAGE_EOS)
            {
                g_print ("EOS from element %s\n",
                        GST_OBJECT_NAME (GST_MESSAGE_SRC (forward_msg)));
                DestroyBin();
                CreateNewBin();
                RemovePad();
            }
            gst_message_unref (forward_msg);
        }
    }
        break;

    default:
        break;
    }
    return TRUE;
}

int
main (int argc, char **argv)
{

    GError *err = NULL;
    GMainLoop *loop;
    GstElement *src, *q1,/* *q2,*/ /**effect,*/ /**filter1*//*, *filter2*/ *encoder;/*, *sink*/;

    gst_init(&argc, &argv);

    pipeline = gst_pipeline_new ("pipeline");

    src = gst_element_factory_make ("videotestsrc", NULL);

    //Create a caps filter between videosource videoconvert
    std::string capsString = "video/x-raw,format=YV12,width=320,height=240,framerate=30/1";
    GstCaps * dataFilter = gst_caps_from_string(capsString.c_str());

    q1 = gst_element_factory_make ("queue", NULL);

    encoder = gst_element_factory_make ("x264enc", NULL);

    q2 = gst_element_factory_make("queue", NULL);

    gst_bin_add_many(GST_BIN(pipeline), src, q1, encoder, q2, 0);
    gboolean link = gst_element_link_filtered(src, q1, dataFilter);
    link &= gst_element_link(q1, encoder);
    link &= gst_element_link(encoder, q2);

    CreateNewBin();

    gst_element_set_state (pipeline, GST_STATE_PLAYING);

    loop = g_main_loop_new (NULL, FALSE);

    gst_bus_add_watch (GST_ELEMENT_BUS (pipeline), bus_cb, loop);

    g_timeout_add_seconds (10, timeout_cb, loop);

    g_main_loop_run (loop);

    gst_element_set_state (pipeline, GST_STATE_NULL);
    gst_object_unref (pipeline);

    return 0;
}

void RemovePad()
{
    GstPad * q2SrcPad;
    q2SrcPad = gst_element_get_static_pad(q2, "src");
    gst_pad_remove_probe(q2SrcPad, probeId);
    gst_object_unref(q2SrcPad);
}

void DestroyBin()
{
    gst_element_set_state(bin, GST_STATE_NULL);
    gst_bin_remove(GST_BIN(pipeline), bin);
}

void CreateNewBin()
{

    static std::string fileLocPattern = "deneme%d.mkv";
    char buffer[12];
    memset(buffer, 0, sizeof(buffer));
    sprintf(buffer, fileLocPattern.c_str(), i++);

    //Create Muxer Element
    muxer = gst_element_factory_make("matroskamux", "MatroskaMuxer");

    //Create File Sink Element
    sink = gst_element_factory_make("filesink", buffer);
    g_object_set(G_OBJECT(sink), "location", buffer, 0);

    //Create muxsinkBin
    bin = gst_bin_new(buffer);
    g_object_set(G_OBJECT(bin), "message-forward", TRUE, 0);
    //Add a src pad to the bin
    gst_bin_add_many(GST_BIN(bin), muxer, sink, 0);

    gboolean linkState = TRUE;
    //Connect elements within muxsink_bin
    //Link: matroskamuxer -> filesink
    linkState &= gst_element_link_many(muxer, sink, 0);

    //Add this bin to pipeline
    gst_bin_add(GST_BIN(pipeline), bin);

    //Create ghostpad and manually link muxsinkBin and remaining part of the pipeline
    {
        GstPadTemplate * muxerSinkPadTemplate;


        if( !(muxerSinkPadTemplate = gst_element_class_get_pad_template(GST_ELEMENT_GET_CLASS(muxer), "video_%u")) )
        {
            std::cout << "Unable to get source pad template from muxing element" << std::endl;
        }

        //Obtain dynamic pad from element
        muxerSinkPad = gst_element_request_pad(muxer, muxerSinkPadTemplate, 0, 0);

        //Add ghostpad
        GstPad * ghostPad = gst_ghost_pad_new("sink", muxerSinkPad);
        gst_element_add_pad(bin, ghostPad);
        gst_object_unref(GST_OBJECT(muxerSinkPad));

        gst_element_sync_state_with_parent(bin);

        //Get src pad from queue element
        GstPad * queueBeforeBinSrcPad = gst_element_get_static_pad(q2, "src");

        //Link queuebeforebin to ghostpad
        if (gst_pad_link(queueBeforeBinSrcPad, ghostPad) != GST_PAD_LINK_OK )
        {

            std::cout << "QueueBeforeBin cannot be linked to MuxerSinkPad." << std::endl;
        }
        gst_object_unref(queueBeforeBinSrcPad);
    }
}

http://gstreamer-devel.966125.n4.nabble.com/Listening-on-EOS-events-for-GstBin-td4669126.html

http://gstreamer-devel.966125.n4.nabble.com/file/n4669476/main.cpp


1
投票

根据您的用例,您可以使用

multifilesink
元素。它将在某些事件发生时动态切换文件。每个缓冲区一个文件,每个段一个文件...检查其属性,看看是否有任何适合您的内容。

如果您想编写类似的东西(或者扩展它?),它也可以作为一个很好的代码库


0
投票

我将发布实际自定义 GstBin 又名“muxsink_bin”的代码,我最终实现了该代码来为管道的“可拆卸接收器部分”进行转发和 EOS 处理。

plisolatedbin.h:

#pragma once

#include <gst/gst.h>
#include <gst/gstbin.h>

G_BEGIN_DECLS

#define PL_TYPE_ISOLATED_BIN             (pl_isolated_bin_get_type ())
#define PL_IS_ISOLATED_BIN(obj)          (G_TYPE_CHECK_INSTANCE_TYPE ((obj), PL_TYPE_ISOLATED_BIN))
#define PL_IS_ISOLATED_BIN_CLASS(klass)  (G_TYPE_CHECK_CLASS_TYPE ((klass), PL_TYPE_ISOLATED_BIN))
#define PL_ISOLATED_BIN_GET_CLASS(obj)   (G_TYPE_INSTANCE_GET_CLASS ((obj), PL_TYPE_ISOLATED_BIN, PlIsolatedBinClass))
#define PL_ISOLATED_BIN(obj)             (G_TYPE_CHECK_INSTANCE_CAST ((obj), PL_TYPE_ISOLATED_BIN, PlIsolatedBin))
#define PL_ISOLATED_BIN_CLASS(klass)     (G_TYPE_CHECK_CLASS_CAST ((klass), PL_TYPE_ISOLATED_BIN, PlIsolatedBinClass))
#define PL_ISOLATED_BIN_CAST(obj)        ((PlIsolatedBin*)(obj))

typedef struct _PlIsolatedBin PlIsolatedBin;
typedef struct _PlIsolatedBinClass PlIsolatedBinClass;

/**
 * Does not forward EOS to parent by default.
 */
struct _PlIsolatedBin
{
    GstBin bin;
};

struct _PlIsolatedBinClass
{
    GstBinClass parent_class;
};

GType pl_isolated_bin_get_type();
GstElement* pl_isolated_bin_new();

G_END_DECLS

plisolatedbin.c:

#include "plisolatedbin.h"

#include <assert.h>

G_DEFINE_TYPE(PlIsolatedBin, pl_isolated_bin, GST_TYPE_BIN)

static void pl_isolated_bin_init(PlIsolatedBin *plisolatedbin)
{
}

static void pl_isolated_bin_handle_message_func(GstBin *bin, GstMessage *message)
{
    if (GST_MESSAGE_TYPE(message) != GST_MESSAGE_EOS)
    {
        GST_BIN_CLASS(pl_isolated_bin_parent_class)->handle_message(bin, message);
    }
    else
    {
        GstMessage *forwarded = gst_message_new_element(GST_OBJECT_CAST(bin), gst_structure_new("PlIsolatedBinForwarded", "message", GST_TYPE_MESSAGE, message, NULL));
        gst_element_post_message(GST_ELEMENT_CAST(bin), forwarded);
    }
}

static void pl_isolated_bin_class_init(PlIsolatedBinClass *class)
{
    class->parent_class.handle_message = GST_DEBUG_FUNCPTR(pl_isolated_bin_handle_message_func);
}

GstElement* pl_isolated_bin_new()
{
    return g_object_new(PL_TYPE_ISOLATED_BIN, NULL);
}

0
投票

以下是如何在 python 中添加和捕获消息转发。

在垃圾箱上启用消息转发:

from gi.repository import Gst
your_sink_bin = Gst.Bin.new()
your_sink_bin.set_property("message-forward", True)

捕获转发的消息:

from gi.repository import Gst

def bus_call(self, _bus, message: Gst.Message, _loop: GLib.MainLoop) -> Literal[True]:
    message_source: Optional[str] = message.src.get_name()
    
    if message_source == "your-bin-name" and message.has_name("GstBinForwarded"):
        forwarded_message:Gst.Message = message.get_structure().get_value("message")
        
        if "your-sink" in forwarded_message.src.get_name() and forwarded_message.type == Gst.MessageType.EOS:
            self.srt_message_handled = True
© www.soinside.com 2019 - 2024. All rights reserved.