Gstreamer appsink and appsrc to publish and subscribe h.265 webcam stream. How?

asked 2019-10-04 12:10:42 -0600

JeyP4 gravatar image

updated 2019-10-04 12:14:28 -0600

Hi I am trying to publish h.265 encoded webcam stream and to subscribe the same. I think, I have successfully achieved publishing it, but subscribing and decoding is difficult for me.

For temporary purpose, I am using sensor_msgs::CompressedImage msg container.

Publisher node in pkg1, this node works well. Inspired by ros-drivers/audio_common/audio_capture/src/audio_capture.cpp

#include "ros/ros.h"
#include <sensor_msgs/CompressedImage.h>
#include <gst/gst.h>
#include <gst/app/gstappsink.h>
#include <boost/thread.hpp>

class videoCapture {
public:
    videoCapture() {
        _pub = _nh.advertise<sensor_msgs::CompressedImage>("frontCamera", 1, true);

        _loop = g_main_loop_new(NULL, false);
        _pipeline = gst_pipeline_new("ros_pipeline");
        _bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline));
        gst_bus_add_signal_watch(_bus);
        g_signal_connect(_bus, "message::error", G_CALLBACK(onMessage), this);
        g_object_unref(_bus);

        _source = gst_element_factory_make ("v4l2src", "source");
        g_object_set (_source, "device", "/dev/video0", NULL);
        _filter = gst_element_factory_make ("capsfilter", "capsfilter");
        {
            GstCaps *caps;
            caps = gst_caps_from_string("video/x-raw, width=(int)1280, height=(int)720, framerate=(fraction)30/1");
            g_object_set( G_OBJECT(_filter), "caps", caps, NULL);
            gst_caps_unref(caps);
        }
        _vaapih265enc = gst_element_factory_make ("vaapih265enc", "vaapih265enc");
        _vaapidecode = gst_element_factory_make ("vaapidecode", "vaapidecode");
        _sink = gst_element_factory_make("appsink", "sink");
        g_object_set(G_OBJECT(_sink), "emit-signals", true, "max-buffers", 1, NULL);

        g_signal_connect( G_OBJECT(_sink), "new-sample", G_CALLBACK(onNewBuffer), this);

        {
            GstCaps *caps;
            caps = gst_caps_new_simple("video/x-h265",
                                       "stream-format", G_TYPE_STRING,
                                       "byte-stream",
                                       "alignment", G_TYPE_STRING, "au", NULL);
            g_object_set( G_OBJECT(_sink), "caps", caps, NULL);
            gst_caps_unref(caps);
        }

        if (!_pipeline || !_source || !_filter || !_vaapih265enc || !_vaapidecode || !_sink ) {
            ROS_ERROR_STREAM("Not all elements could be created.\n");
            exitOnMainThread(1);
        }

        gst_bin_add_many( GST_BIN(_pipeline), _source, _filter, _vaapih265enc, _sink, NULL);

        if (!gst_element_link_many( _source, _filter, _vaapih265enc, _sink, NULL)) {
            ROS_ERROR_STREAM("Elements can't be linked.");
            exitOnMainThread(1);
        }

        gst_element_set_state(GST_ELEMENT(_pipeline), GST_STATE_PLAYING);

        _gst_thread = boost::thread( boost::bind(g_main_loop_run, _loop) );

    }
    ~videoCapture() {
        g_main_loop_quit(_loop);
        gst_element_set_state(_pipeline, GST_STATE_NULL);
        gst_object_unref(_pipeline);
        g_main_loop_unref(_loop);
    }

    void exitOnMainThread(int code)
    {
        exit(code);
    }

    void publish( const sensor_msgs::CompressedImage &msg )
    {
        _pub.publish(msg);
    }

    static GstFlowReturn onNewBuffer (GstAppSink *appsink, gpointer userData)
    {
        videoCapture *server = reinterpret_cast<videoCapture*>(userData);
        GstMapInfo map;

        GstSample *sample;
        g_signal_emit_by_name(appsink, "pull-sample", &sample);

        GstBuffer *buffer = gst_sample_get_buffer(sample);

        sensor_msgs::CompressedImage msg;
        gst_buffer_map(buffer, &map, GST_MAP_READ);
        msg.data.resize( map.size );

        memcpy( &msg.data[0], map.data, map.size );

        gst_buffer_unmap(buffer, &map);
        gst_sample_unref(sample);

        msg.header.stamp = ros::Time::now();
        server->publish(msg);

        return GST_FLOW_OK;
    }

    static gboolean onMessage (GstBus *bus, GstMessage *message, gpointer userData)
    {
        videoCapture *server = reinterpret_cast<videoCapture*>(userData);
        GError *err;
        gchar *debug;

        gst_message_parse_error(message, &err, &debug);
        ROS_ERROR_STREAM("gstreamer: " << err->message);
        g_error_free(err);
        g_free(debug);
        g_main_loop_quit(server->_loop);
        server->exitOnMainThread(1);
        return FALSE;
    }

private:
    ros::NodeHandle _nh;
    ros::Publisher _pub;

    boost::thread _gst_thread;

    GstElement *_pipeline, *_source, *_filter, *_sink;
    GstElement *_vaapih265enc, *_vaapidecode;

    GstBus *_bus;
    GMainLoop *_loop;
    std::string _format;
};

int main(int argc, char **argv)
{
    ros::init(argc, argv, "h265stream");
    gst_init(&argc, &argv);
    videoCapture obj;
    ros::spin();
}

Subscriber node in pkg2, this node shows error. Inspired by ros-drivers/audio_common/audio_play/src/audio_play.cpp

#include <ros/ros.h>
#include <sensor_msgs/CompressedImage.h>
#include <gst/gst.h>
#include <gst/app/gstappsrc.h>
#include <boost/thread.hpp>

class videoShow {
public:
    videoShow() {

        _sub = _nh.subscribe("frontCamera", 1, &videoShow::getBuffer, this);

        _loop = g_main_loop_new(NULL, false);
        _pipeline = gst_pipeline_new("appsrc_pipeline");

        _bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline));
        gst_bus_add_signal_watch(_bus);
        g_signal_connect(_bus, "message::error", G_CALLBACK(onMessage), this);
        g_object_unref(_bus);

        _source = gst_element_factory_make ("appsrc", "appsrc");

        _vaapidecode = gst_element_factory_make ("vaapidecode", "vaapidecode");

        _sink = gst_element_factory_make("vaapisink", "sink");

        if (!_pipeline || !_source || !_vaapidecode || !_sink ...
(more)
edit retag flag offensive close merge delete