Ask Your Question
1

How to check callback queue?

asked 2021-09-10 09:27:51 -0500

Ariel gravatar image

updated 2021-09-27 12:26:00 -0500

Hello,

I have a node that subscribes to 3 topics that are being published by individual nodes. In the subscriber node, I have 3 callbacks, one for each. I would like to know if it is possible to know whether there are messages in callback queue. What I am trying to do is to avoid that the same topic triggers its callback twice in a row if there are messages from the other 2 topics available in the callback queue.

This is the code I have now:

#include "ros/ros.h"
#include "sensor_msgs/Image.h"
#include "sensor_msgs/LaserScan.h"
#include "geometry_msgs/TwistStamped.h"

// Callbacks
void pc_pub_img_callback(const sensor_msgs::Image::ConstPtr& msg){
      //ROS_INFO("Received from pc_pub_img");
      ROS_INFO("Img:Callback(%d)", msg->header.seq);
      ros::Duration(0.2).sleep();
      // Serialization
      sensor_msgs::Image Image;
      uint32_t serial_size = ros::serialization::serializationLength(*msg);
      boost::shared_array<uint8_t> buffer(new uint8_t[serial_size]);
      ROS_INFO("Img:Serialized");
}

void pc_pub_laser_callback(const sensor_msgs::LaserScan::ConstPtr& msg){
      //ROS_INFO("Received from pc_pub_laser");
      ROS_INFO("Laser:Callback(%d)", msg->header.seq);

      // Serialization
      sensor_msgs::Image LaserScan;
      uint32_t serial_size = ros::serialization::serializationLength(*msg);
      boost::shared_array<uint8_t> buffer(new uint8_t[serial_size]);
      ROS_INFO("Laser:Serialized");
}

void pc_pub_twist_callback(const geometry_msgs::TwistStamped::ConstPtr& msg){
      //ROS_INFO("Received from pc_pub_img");
      ROS_INFO("Twist:Callback(%d)", msg->header.seq);

      // Serialization
      geometry_msgs::Twist Twist;
      uint32_t serial_size = ros::serialization::serializationLength(*msg);
      boost::shared_array<uint8_t> buffer(new uint8_t[serial_size]);
      ROS_INFO("Twist:Serialized");
}

int main(int argc, char **argv)
{

    ros::init(argc, argv, "arm_to_fpga");
    ros::NodeHandle n;

    ros::Subscriber sub_img = n.subscribe("/pc_pub_img", 1000, pc_pub_img_callback);
    ros::Subscriber sub_laser = n.subscribe("/pc_pub_laser", 1000, pc_pub_laser_callback);
    ros::Subscriber sub_twist = n.subscribe("/pc_pub_twist", 1000, pc_pub_twist_callback);

    ros::spin();

    return 0;
}

Thanks.

EDIT: I've changed the frequencies for the 3 publishers at 30, 60 and 60 respectively and removed the serialization part to make the output simpler. Then, I got the following:

[ INFO] [1632761598.718118783]: Twist:Callback
[ INFO] [1632761598.720308913]: Laser:Callback
[ INFO] [1632761598.735116602]: Img:Callback
[ INFO] [1632761598.736248825]: Twist:Callback
[ INFO] [1632761598.736324787]: Laser:Callback
[ INFO] [1632761598.750847497]: Twist:Callback
[ INFO] [1632761598.753741510]: Laser:Callback
[ INFO] [1632761598.762706367]: Img:Callback
[ INFO] [1632761598.764424037]: Twist:Callback
[ INFO] [1632761598.766623670]: Laser:Callback
[ INFO] [1632761598.778012548]: Img:Callback
[ INFO] [1632761598.781003259]: Twist:Callback
[ INFO] [1632761598.783278070]: Laser:Callback
[ INFO] [1632761598.798151580]: Twist:Callback
[ INFO] [1632761598.800334432]: Laser:Callback
[ INFO] [1632761598.817158784]: Img:Callback
[ INFO] [1632761598.817276101]: Twist:Callback
[ INFO] [1632761598.817368705]: Laser:Callback
[ INFO] [1632761598.831502251]: Twist:Callback
[ INFO] [1632761598.833594552]: Laser:Callback
[ INFO] [1632761598.848181492]: Twist:Callback
[ INFO] [1632761598.850259481]: Laser:Callback
[ INFO] [1632761598.853909133]: Img:Callback
[ INFO] [1632761598.864712495]: Twist:Callback
[ INFO] [1632761598.866926431]: Laser:Callback
[ INFO] [1632761598.881360624]: Twist:Callback
[ INFO] [1632761598.885404560]: Img:Callback
[ INFO] [1632761598.885498932]: Laser:Callback
[ INFO] [1632761598.898164827]: Twist:Callback
[ INFO] [1632761598.900324819]: Laser:Callback
[ INFO] [1632761598.914681449]: Twist:Callback
[ INFO] [1632761598.918684480]: Img:Callback
[ INFO] [1632761598.918797223]: Laser:Callback
[ INFO] [1632761598.931412780]: Twist:Callback
[ INFO] [1632761598.953476288]: Img:Callback
[ INFO] [1632761598.953735208]: Twist:Callback
[ INFO] [1632761598.960459469]: Laser:Callback
[ INFO] [1632761598.960576034]: Laser:Callback
[ INFO] [1632761598.965029298]: Twist:Callback
[ INFO] [1632761598.980162791]: Img:Callback
[ INFO] [1632761598.981067892]: Twist:Callback
[ INFO] [1632761598.997919124]: Twist ...
(more)
edit retag flag offensive close merge delete

Comments

I think you'll need to write your own CallbackQueue and spinner like mentioned here. Unfortunately, I haven't done that in ROS 1 so I can't provide more help than that but I thought I'd mention it.

Jaron gravatar image Jaron  ( 2021-09-11 21:55:43 -0500 )edit

it seems this is the way to go. Unfortunately I haven't found any example that shows how to check which queue for the messages has data available. I would assume that at some point, all queues for the 3 messages have received at least one and I would like to know at a given frequency, which ones has something in their queue. I found something similar here but that only gives priority to one node. I would first like to know whether there is a message in each callback queue and then decide myself from which queue to read first.

Ariel gravatar image Ariel  ( 2021-09-27 10:19:57 -0500 )edit

3 Answers

Sort by ยป oldest newest most voted
0

answered 2021-09-29 11:07:07 -0500

Ariel gravatar image

updated 2021-09-30 07:23:38 -0500

According to the documentation (as suggested by @gvdhoorn) the code should look something like this:

#include "ros/ros.h"
#include "ros/callback_queue.h"
#include "sensor_msgs/Image.h"
#include "sensor_msgs/LaserScan.h"
#include "geometry_msgs/TwistStamped.h"

// Callbacks
void pc_pub_img_callback(const sensor_msgs::Image::ConstPtr& msg){
      ROS_INFO("Img:Callback");
}

void pc_pub_laser_callback(const sensor_msgs::LaserScan::ConstPtr& msg){
      ROS_INFO("Laser:Callback");
}

void pc_pub_twist_callback(const geometry_msgs::TwistStamped::ConstPtr& msg){
      ROS_INFO("Twist:Callback");
}

int main(int argn, char* args[])
{
  ros::init(argn, args, "arm_to_fpga");
  ros::NodeHandle n_img;
  ros::NodeHandle n_laser;
  ros::NodeHandle n_twist;

  ros::CallbackQueue Queue_img;
  ros::CallbackQueue Queue_laser;
  ros::CallbackQueue Queue_twist;

  n_img.setCallbackQueue(&Queue_img);
  n_laser.setCallbackQueue(&Queue_laser);
  n_twist.setCallbackQueue(&Queue_twist);

  ros::Subscriber s_img = n_img.subscribe("/FPGA_ROS_Scheduler/pc_pub_img", 1, pc_pub_img_callback);
  ros::Subscriber s_laser = n_laser.subscribe("/FPGA_ROS_Scheduler/pc_pub_laser", 1, pc_pub_laser_callback);
  ros::Subscriber s_twist = n_twist.subscribe("/FPGA_ROS_Scheduler/pc_pub_twist", 1, pc_pub_twist_callback);

  while(ros::ok())
  {
    if(!Queue_img.isEmpty() && !Queue_laser.isEmpty() && !Queue_twist.isEmpty())
    {
      Queue_img.callOne(ros::WallDuration(1.0));
      Queue_laser.callOne(ros::WallDuration(1.0));
      Queue_twist.callOne(ros::WallDuration(1.0));
    }
    ros::spinOnce();
  }    
  return 0;
}

EDIT: Here is the working example. It waits until all 3 queues have at least one message to invoke the callbacks.

edit flag offensive delete link more

Comments

This is not an answer I believe, as you report yourself it doesn't seem to work.

gvdhoorn gravatar image gvdhoorn  ( 2021-09-29 11:21:05 -0500 )edit

True, it is not fully working like posted there, but seems to be the way to go with respect to the queues. I believe the issue is how to spin. That's why I posted it as an answer rather than editing the original question. Should I do that and delete this question?

EDIT: I got it working and edited the answer.

Ariel gravatar image Ariel  ( 2021-09-29 11:23:03 -0500 )edit

I am skeptical that the code shown works well, because I see no ros::spinOnce() in there, and no rate limiting in the main loop. Also, the code does not implement a solution for the problem that is described.

A far simpler soultion is: If you don't want messages going out close together in time, then use a global variable to store the time-of-day you last published a message, then discard any new messages that arrive shortly after that. However, this approach has its own problems: it assumes you are willing to ignore the effect of message delivery delays, and to ignore the header.stamp in the arriving messages.

Mike Scheutzow gravatar image Mike Scheutzow  ( 2021-09-30 07:14:27 -0500 )edit

Removing the commented things during tested I deleted the ros::spinOnce(); (edited). The code now implements a solution to check callback queues individually which was my main question.

Ariel gravatar image Ariel  ( 2021-09-30 07:23:12 -0500 )edit
0

answered 2021-09-28 02:13:15 -0500

gvdhoorn gravatar image

updated 2021-09-28 02:15:41 -0500

You can certainly check what the state is of a CallbackQueue.

See the API documentation, specifically: CallbackQueue::isEmpty().

Whether this will let you do what you want I can't say. I don't really understand what you're trying to do.

Are you trying to implement some sort of round-robin scheduling in your msg forwarder (I assume you are sending those buffers off to somewhere else over a socket)?

edit flag offensive delete link more

Comments

Yes, exactly. I want to implement a round-robin scheduling to forward the incoming messages over a single hardware resource (for all incoming messages). Based on what I understand, I should have one CallbackQueue for each message (like the last figure here) and then check whether all queues are empty. Those that aren't, come into a scheduling scheme to decide which one should send its message over the single hw resource and avoid that the same message is sent more than once when there are other available messages in the queue. Besides the API documentation, I couldn't find any examples. There is only one thread here but he was checking isEmpty from the callback returning always empty. I don't believe isEmpty should be called from within the callback but from the main and depending on the outcome of the scheduler, invoke the corresponding callback.

Ariel gravatar image Ariel  ( 2021-09-28 02:26:28 -0500 )edit

Thing is, you're doing things which aren't normally done. That's why it's hard to find examples.

I'm not sure, but I don't believe there's even a need to create typed subscriptions with callbacks in your case. Have you looked into using a generic subscriber (ie: ShapeShifter (perhaps combined with ros_type_introspection and/or StefanFabian/ros_babel_fish))? They either already do what you're trying to do, or should make it all much easier.

Why reserialise a message when you're only interested in the bytes anyway, and not the actual structured message? That's a huge amount of overhead.

gvdhoorn gravatar image gvdhoorn  ( 2021-09-28 02:30:43 -0500 )edit

Example of a (set of) package(s) which does something similar to what I believe you're doing: AIS-Bonn/nimbro_network (but there are more).

In this case: single UDP or TCP socket == "a single hardware resource".

gvdhoorn gravatar image gvdhoorn  ( 2021-09-28 02:35:32 -0500 )edit

Those are quite interesting points. You are right, there is no need to reserialize to increase the overhead. However, the key thing I am looking for is how to handle the callback queue (better multiple queues) and decide when to read each one so I guess it should be a combination of isEmpty and the generic subscribers you mentioned.

Ariel gravatar image Ariel  ( 2021-09-28 02:41:04 -0500 )edit

However, the key thing I am looking for is how to handle the callback queue

I would again suggest to take a look at existing solutions.

gvdhoorn gravatar image gvdhoorn  ( 2021-09-28 05:05:13 -0500 )edit
0

answered 2021-09-22 05:22:34 -0500

ljaniec gravatar image

Maybe a package called message_filters could help you solve your problem:

http://wiki.ros.org/message_filters

Especially 6. Cache, you could implement some kind of analyzer of last N messages..?

edit flag offensive delete link more

Your Answer

Please start posting anonymously - your entry will be published after you log in or create a new account.

Add Answer

Question Tools

3 followers

Stats

Asked: 2021-09-10 09:27:51 -0500

Seen: 188 times

Last updated: Sep 30