Robotics StackExchange | Archived questions

I believe I need multithreading help! Process has died [pid 31330, exit code -11...

I've been trying to make use of rosbag::Recorder to programmatically record bag files through an Rviz panel widget. I decided the best way to accomplish this was to separate the GUI code from the actual collection of data, and subsequent translation into a .csv file to be analyzed offline, so I used an action to communicate between nodes. Originally I tried to just use rosbag.write() in the /joint_states callback, and in the /wrench callback, but ran into some race conditions (an error relating to free() popped up, seemed like fighting for the same memory block). I wasn't sure how to solve this... (maybe mutex..?), so I tried to save the incoming messages directly to a .csv file, which had problems of its own, notably the time stamps were difficult to decipher since the messages aren't published or received simultaneously.

After all this, I found rosbag::Recorder. The problem I've run into is that rosbag::Recorder has no way to stop recording, you just need to close the node. However, I can't put the recorder.run() command directly into my executeCallback since it's blocking, and only exits when the node is closed. So I have attempted multithreading, which seems to work fine except when I close the node, I get a strange message, which only appears when I have thread_group.join_all(), but without this command, the .bag file stays as .bag.active. The error which appears, and the code are below.

[ INFO] [1568812221.925664598]: bag server started
Execute cb reached!
Record Bag
[ INFO] [1568812235.810963832]: Subscribing to /joint_states
[ INFO] [1568812235.814695031]: Subscribing to /wrench
[ INFO] [1568812235.817165194]: Recording to 2019-09-18-09-10-35.bag.
Execute cb reached!
Stopped recording
[bag_server-1] process has died [pid 20260, exit code -11, cmd /home/joshua/catkin_ws/devel/lib/prop_data_collector/prop_data_collector_bag_server __name:=bag_server __log:=/home/joshua/.ros/log/8b15a9b2-da15-11e9-92b3-b8ca3a9f2c71/bag_server-1.log].
log file: /home/joshua/.ros/log/8b15a9b2-da15-11e9-92b3-b8ca3a9f2c71/bag_server-1*.log
[bag_server-1] restarting process
process[bag_server-1]: started with pid [20288]
[ INFO] [1568812241.256460079]: bag server started

And the code:

class recordNode{
protected:
    int argc;
    char** argv;
    ros::NodeHandle nh; 
public:
    recordNode(int argc, char** argv){
        argc = argc;
        argv = argv;
        ros::init(argc,argv,"RecordNode");

        //options.prefix = "~/catkin_ws/";

    }
    virtual ~recordNode(){

    };
    void closenode(){
        ros::requestShutdown();
        //ros::shutdown();
    }   
    void record(int argc, char** argv){

        rosbag::RecorderOptions options;
        options.topics.push_back("/joint_states");
        options.topics.push_back("/wrench");
        rosbag::Recorder recorder(options);
        recorder.run();
    }
};
class bagHandler{
protected:
    ros::NodeHandle nh_;
    actionlib::SimpleActionServer<prop_data_collector::bag_serverAction> as_;
    std::string action_name_;
    // messages 
    prop_data_collector::bag_serverFeedback feedback_;
    prop_data_collector::bag_serverResult result_;
    rosbag::Bag bag;
    bool record = 0;
    std::string prop_location;
    bool wdone = 0;
    bool jdone = 0;
    sensor_msgs::JointState curJS;
    geometry_msgs::WrenchStamped curW;
    boost::shared_ptr<recordNode> recNode;
    boost::thread_group thread_group;
    int argc;
    char** argv;


public:
    bagHandler(std::string name,int argc, char** argv):
    as_(nh_, name, boost::bind(&bagHandler::executeCB, this, _1), false), action_name_(name)
    {
        argc = argc;
        argv = argv;
        as_.start();
        ROS_INFO("bag server started");
        prop_location = ros::package::getPath("prop_data_collector");
        recNode.reset(new recordNode(argc,argv));
    }
    ~bagHandler(){

    }

    void executeCB(const prop_data_collector::bag_serverGoalConstPtr &goal){
        std::cout << "Execute cb reached!"<<std::endl;
        if(goal->bagOperation == 1){// then record!
            std::cout << "Record Bag"<<std::endl;
            record = 1;

            thread_group.add_thread(new boost::thread(&recordNode::record,recNode,argc,argv));
            //thread_group.create_thread(recNode->record,argc,argv);
            // below,attemped to use lambda function, doesn't work
            //thread_group.create_thread([this]{recNode->record(argc,argv)});
            //thread_group.add_thread(new boost::thread(recNode->record,argc,argv));
            result_.success=true;
            as_.setSucceeded(result_);

        }else if(goal->bagOperation == 2){ // stop recording
            std::cout << "Stopped recording" <<std::endl;
            record == 2;
            recNode->closenode();
            //recNode->~recordNode();
            // NOTE: there's maybe no need to joint the threads, when we shutdown the node, execution
            // of that thread is effectively stopped. There's nothing to join as that thread shares
            // nothing with the main...?  
            thread_group.join_all();
            ros::requestShutdown();

            //bag.close();
        }else if(goal->bagOperation == 3){ // save
            saveBagToCSV();
            std::cout << "Save bag to csv" <<std::endl;
        }
    }

    //void jointStatesCallback(const ros::MessageEvent<sensor_msgs::JointState>& event){
    void jointStatesCallback(sensor_msgs::JointState jsmsg){
        if(record ==1){
            //bag.write("joint_states",event);
            curJS = jsmsg;
            //std::cout << "JS at: "<< jsmsg.header.stamp.toSec() <<std::endl;
            //std::cout << "JS rec"<<std::endl;
            jdone = 1;
        }
    }
    //void wrenchCallback(const ros::MessageEvent<geometry_msgs::WrenchStamped>& event)
    void wrenchCallback(geometry_msgs::WrenchStamped wmsg){
        if(record == 1){
            //bag.write("wrench",event);
            curW = wmsg;
            //std::cout << "RECORDED WRENCH"<< std::endl;
            wdone = 1;
        }
    }
    void saveBagToCSV(){
}

};

int main(int argc, char** argv){
    ros::init(argc, argv, "bag_server");
    ros::NodeHandle nh;
    ros::AsyncSpinner spinner(0);
    spinner.start();

    bagHandler baghandler("bagHandler",argc,argv);

    ros::Subscriber sub = nh.subscribe("joint_states",1000,&bagHandler::jointStatesCallback,&baghandler);
    ros::Subscriber sub2 = nh.subscribe("wrench",1000,&bagHandler::wrenchCallback,&baghandler);
    ros::waitForShutdown();
    return 0;
}

any assistance in figuring out where this error comes from would be very helpful, as well as any guidance in terms of whether I've taken a reasonable approach. I should note that the code seems to work as desired, but the error isn't confidence inspiring.

Thanks in advance, Josh

Asked by jbeck28 on 2019-09-18 08:46:27 UTC

Comments

Not an answer, but perhaps #q318076 is related, as well as #q216866. And the linked osrf/nodelet_rosbag has an action server interface that seems to be able to do what you want.

Asked by gvdhoorn on 2019-09-18 09:01:03 UTC

I looked into that nodelet, however I couldn't figure out how to use it as it's undocumented. I understand it's an action server... but I can't seem to send it goals or a subscriber list... or a mode. I'm not entirely sure how it works. Any tips?

Asked by jbeck28 on 2019-09-18 13:02:52 UTC

Answers