I believe I need multithreading help! Process has died [pid 31330, exit code -11...
I've been trying to make use of rosbag::Recorder
to programmatically record bag files through an Rviz panel widget. I decided the best way to accomplish this was to separate the GUI code from the actual collection of data, and subsequent translation into a .csv file to be analyzed offline, so I used an action to communicate between nodes. Originally I tried to just use rosbag.write()
in the /joint_states callback, and in the /wrench callback, but ran into some race conditions (an error relating to free() popped up, seemed like fighting for the same memory block). I wasn't sure how to solve this... (maybe mutex..?), so I tried to save the incoming messages directly to a .csv file, which had problems of its own, notably the time stamps were difficult to decipher since the messages aren't published or received simultaneously.
After all this, I found rosbag::Recorder. The problem I've run into is that rosbag::Recorder
has no way to stop recording, you just need to close the node. However, I can't put the recorder.run()
command directly into my executeCallback since it's blocking, and only exits when the node is closed. So I have attempted multithreading, which seems to work fine except when I close the node, I get a strange message, which only appears when I have thread_group.join_all()
, but without this command, the .bag file stays as .bag.active. The error which appears, and the code are below.
[ INFO] [1568812221.925664598]: bag server started
Execute cb reached!
Record Bag
[ INFO] [1568812235.810963832]: Subscribing to /joint_states
[ INFO] [1568812235.814695031]: Subscribing to /wrench
[ INFO] [1568812235.817165194]: Recording to 2019-09-18-09-10-35.bag.
Execute cb reached!
Stopped recording
[bag_server-1] process has died [pid 20260, exit code -11, cmd /home/joshua/catkin_ws/devel/lib/prop_data_collector/prop_data_collector_bag_server __name:=bag_server __log:=/home/joshua/.ros/log/8b15a9b2-da15-11e9-92b3-b8ca3a9f2c71/bag_server-1.log].
log file: /home/joshua/.ros/log/8b15a9b2-da15-11e9-92b3-b8ca3a9f2c71/bag_server-1*.log
[bag_server-1] restarting process
process[bag_server-1]: started with pid [20288]
[ INFO] [1568812241.256460079]: bag server started
And the code:
class recordNode{
protected:
int argc;
char** argv;
ros::NodeHandle nh;
public:
recordNode(int argc, char** argv){
argc = argc;
argv = argv;
ros::init(argc,argv,"RecordNode");
//options.prefix = "~/catkin_ws/";
}
virtual ~recordNode(){
};
void closenode(){
ros::requestShutdown();
//ros::shutdown();
}
void record(int argc, char** argv){
rosbag::RecorderOptions options;
options.topics.push_back("/joint_states");
options.topics.push_back("/wrench");
rosbag::Recorder recorder(options);
recorder.run();
}
};
class bagHandler{
protected:
ros::NodeHandle nh_;
actionlib::SimpleActionServer<prop_data_collector::bag_serverAction> as_;
std::string action_name_;
// messages
prop_data_collector::bag_serverFeedback feedback_;
prop_data_collector::bag_serverResult result_;
rosbag::Bag bag;
bool record = 0;
std::string prop_location;
bool wdone = 0;
bool jdone = 0;
sensor_msgs::JointState curJS;
geometry_msgs::WrenchStamped curW;
boost::shared_ptr<recordNode> recNode;
boost::thread_group thread_group;
int argc;
char** argv;
public:
bagHandler(std::string name,int argc, char** argv):
as_(nh_, name, boost::bind(&bagHandler::executeCB, this, _1), false), action_name_(name)
{
argc = argc;
argv = argv;
as_.start();
ROS_INFO("bag server started");
prop_location = ros::package::getPath("prop_data_collector");
recNode.reset(new recordNode(argc,argv));
}
~bagHandler(){
}
void executeCB(const prop_data_collector::bag_serverGoalConstPtr &goal){
std::cout << "Execute cb reached!"<<std::endl;
if(goal->bagOperation == 1){// then record!
std::cout << "Record Bag"<<std::endl;
record = 1;
thread_group.add_thread(new boost::thread(&recordNode::record,recNode,argc,argv));
//thread_group.create_thread(recNode->record,argc,argv);
// below,attemped to use lambda function, doesn't work
//thread_group.create_thread([this]{recNode->record(argc,argv)});
//thread_group.add_thread(new boost::thread(recNode->record,argc,argv));
result_.success=true;
as_.setSucceeded(result_);
}else if(goal->bagOperation == 2){ // stop recording
std::cout << "Stopped recording" <<std::endl;
record == 2;
recNode->closenode();
//recNode->~recordNode();
// NOTE: there's maybe no need to joint the threads, when we shutdown the node, execution
// of that thread is effectively stopped. There's nothing to join as that thread shares
// nothing with the main...?
thread_group.join_all();
ros::requestShutdown();
//bag.close();
}else if(goal->bagOperation == 3){ // save
saveBagToCSV();
std::cout << "Save bag to csv" <<std::endl;
}
}
//void jointStatesCallback(const ros::MessageEvent<sensor_msgs::JointState>& event){
void jointStatesCallback(sensor_msgs::JointState jsmsg){
if(record ==1){
//bag.write("joint_states",event);
curJS = jsmsg;
//std::cout << "JS at: "<< jsmsg.header.stamp.toSec() <<std::endl;
//std::cout << "JS rec"<<std::endl;
jdone = 1;
}
}
//void wrenchCallback(const ros::MessageEvent<geometry_msgs::WrenchStamped>& event)
void wrenchCallback(geometry_msgs::WrenchStamped wmsg){
if(record == 1){
//bag.write("wrench",event);
curW = wmsg;
//std::cout << "RECORDED WRENCH"<< std::endl;
wdone = 1;
}
}
void saveBagToCSV(){
}
};
int main(int argc, char** argv){
ros::init(argc, argv, "bag_server");
ros::NodeHandle nh;
ros::AsyncSpinner spinner(0);
spinner.start();
bagHandler baghandler("bagHandler",argc,argv);
ros::Subscriber sub = nh.subscribe("joint_states",1000,&bagHandler::jointStatesCallback,&baghandler);
ros::Subscriber sub2 = nh.subscribe("wrench",1000,&bagHandler::wrenchCallback,&baghandler);
ros::waitForShutdown();
return 0;
}
any assistance in figuring out where this error comes from would be very helpful, as well as any guidance in terms of whether I've taken a reasonable approach. I should note that the code seems to work as desired, but the error isn't confidence inspiring.
Thanks in advance, Josh
Asked by jbeck28 on 2019-09-18 08:46:27 UTC
Comments
Not an answer, but perhaps #q318076 is related, as well as #q216866. And the linked osrf/nodelet_rosbag has an action server interface that seems to be able to do what you want.
Asked by gvdhoorn on 2019-09-18 09:01:03 UTC
I looked into that nodelet, however I couldn't figure out how to use it as it's undocumented. I understand it's an action server... but I can't seem to send it goals or a subscriber list... or a mode. I'm not entirely sure how it works. Any tips?
Asked by jbeck28 on 2019-09-18 13:02:52 UTC