Gstreamer appsink and appsrc to publish and subscribe h.265 webcam stream. How?
Hi I am trying to publish h.265 encoded webcam stream and to subscribe the same. I think, I have successfully achieved publishing it, but subscribing and decoding is difficult for me.
For temporary purpose, I am using sensor_msgs::CompressedImage
msg container.
Publisher node in pkg1, this node works well. Inspired by ros-drivers/audio_common/audio_capture/src/audio_capture.cpp
#include "ros/ros.h"
#include <sensor_msgs/CompressedImage.h>
#include <gst/gst.h>
#include <gst/app/gstappsink.h>
#include <boost/thread.hpp>
class videoCapture {
public:
videoCapture() {
_pub = _nh.advertise<sensor_msgs::CompressedImage>("frontCamera", 1, true);
_loop = g_main_loop_new(NULL, false);
_pipeline = gst_pipeline_new("ros_pipeline");
_bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline));
gst_bus_add_signal_watch(_bus);
g_signal_connect(_bus, "message::error", G_CALLBACK(onMessage), this);
g_object_unref(_bus);
_source = gst_element_factory_make ("v4l2src", "source");
g_object_set (_source, "device", "/dev/video0", NULL);
_filter = gst_element_factory_make ("capsfilter", "capsfilter");
{
GstCaps *caps;
caps = gst_caps_from_string("video/x-raw, width=(int)1280, height=(int)720, framerate=(fraction)30/1");
g_object_set( G_OBJECT(_filter), "caps", caps, NULL);
gst_caps_unref(caps);
}
_vaapih265enc = gst_element_factory_make ("vaapih265enc", "vaapih265enc");
_vaapidecode = gst_element_factory_make ("vaapidecode", "vaapidecode");
_sink = gst_element_factory_make("appsink", "sink");
g_object_set(G_OBJECT(_sink), "emit-signals", true, "max-buffers", 1, NULL);
g_signal_connect( G_OBJECT(_sink), "new-sample", G_CALLBACK(onNewBuffer), this);
{
GstCaps *caps;
caps = gst_caps_new_simple("video/x-h265",
"stream-format", G_TYPE_STRING,
"byte-stream",
"alignment", G_TYPE_STRING, "au", NULL);
g_object_set( G_OBJECT(_sink), "caps", caps, NULL);
gst_caps_unref(caps);
}
if (!_pipeline || !_source || !_filter || !_vaapih265enc || !_vaapidecode || !_sink ) {
ROS_ERROR_STREAM("Not all elements could be created.\n");
exitOnMainThread(1);
}
gst_bin_add_many( GST_BIN(_pipeline), _source, _filter, _vaapih265enc, _sink, NULL);
if (!gst_element_link_many( _source, _filter, _vaapih265enc, _sink, NULL)) {
ROS_ERROR_STREAM("Elements can't be linked.");
exitOnMainThread(1);
}
gst_element_set_state(GST_ELEMENT(_pipeline), GST_STATE_PLAYING);
_gst_thread = boost::thread( boost::bind(g_main_loop_run, _loop) );
}
~videoCapture() {
g_main_loop_quit(_loop);
gst_element_set_state(_pipeline, GST_STATE_NULL);
gst_object_unref(_pipeline);
g_main_loop_unref(_loop);
}
void exitOnMainThread(int code)
{
exit(code);
}
void publish( const sensor_msgs::CompressedImage &msg )
{
_pub.publish(msg);
}
static GstFlowReturn onNewBuffer (GstAppSink *appsink, gpointer userData)
{
videoCapture *server = reinterpret_cast<videoCapture*>(userData);
GstMapInfo map;
GstSample *sample;
g_signal_emit_by_name(appsink, "pull-sample", &sample);
GstBuffer *buffer = gst_sample_get_buffer(sample);
sensor_msgs::CompressedImage msg;
gst_buffer_map(buffer, &map, GST_MAP_READ);
msg.data.resize( map.size );
memcpy( &msg.data[0], map.data, map.size );
gst_buffer_unmap(buffer, &map);
gst_sample_unref(sample);
msg.header.stamp = ros::Time::now();
server->publish(msg);
return GST_FLOW_OK;
}
static gboolean onMessage (GstBus *bus, GstMessage *message, gpointer userData)
{
videoCapture *server = reinterpret_cast<videoCapture*>(userData);
GError *err;
gchar *debug;
gst_message_parse_error(message, &err, &debug);
ROS_ERROR_STREAM("gstreamer: " << err->message);
g_error_free(err);
g_free(debug);
g_main_loop_quit(server->_loop);
server->exitOnMainThread(1);
return FALSE;
}
private:
ros::NodeHandle _nh;
ros::Publisher _pub;
boost::thread _gst_thread;
GstElement *_pipeline, *_source, *_filter, *_sink;
GstElement *_vaapih265enc, *_vaapidecode;
GstBus *_bus;
GMainLoop *_loop;
std::string _format;
};
int main(int argc, char **argv)
{
ros::init(argc, argv, "h265stream");
gst_init(&argc, &argv);
videoCapture obj;
ros::spin();
}
Subscriber node in pkg2, this node shows error. Inspired by ros-drivers/audio_common/audio_play/src/audio_play.cpp
#include <ros/ros.h>
#include <sensor_msgs/CompressedImage.h>
#include <gst/gst.h>
#include <gst/app/gstappsrc.h>
#include <boost/thread.hpp>
class videoShow {
public:
videoShow() {
_sub = _nh.subscribe("frontCamera", 1, &videoShow::getBuffer, this);
_loop = g_main_loop_new(NULL, false);
_pipeline = gst_pipeline_new("appsrc_pipeline");
_bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline));
gst_bus_add_signal_watch(_bus);
g_signal_connect(_bus, "message::error", G_CALLBACK(onMessage), this);
g_object_unref(_bus);
_source = gst_element_factory_make ("appsrc", "appsrc");
_vaapidecode = gst_element_factory_make ("vaapidecode", "vaapidecode");
_sink = gst_element_factory_make("vaapisink", "sink");
if (!_pipeline || !_source || !_vaapidecode || !_sink ...