ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange
Ask Your Question

Revision history [back]

click to hide/show revision 1
initial version

Hello,

In the class constructor there is a while statement blocking the remaining code execution which is not what you want, I believe. So even if the sintax in your robot_arm_system_test was correct on your code, the functions would not execute.

I believe that what you want is to setup a simple publish subscriber in the same class. To do so, here, and to not only do it once, you should create a thread handling the publisher, and place the in the end of your python main function. However, if your class is driven only by a single publisher and subscriber you can keep your constructor as is (while ros is ok, publish).

Here is an example of how you could do it (WITH THREAD and rospy.spin if nothing is to be done after it):

#!/usr/bin/env python3

import rospy
from std_msgs.msg import Float64
import math
import random
from sensor_msgs.msg import JointState
import numpy as np
import time
import threading

start_time = time.time()


class RobotControl:

    def __init__(self):
        rospy.init_node('robot_control_node', anonymous=True)
        self.talker = rospy.Publisher('/rrbot/joint1_position_controller/command', Float64, queue_size=10)
        rospy.Subscriber('/rrbot/joint_states', JointState, self.joint_callback)
        self.force = None
        self.joint = None
        self.rate = rospy.Rate(10)
        #HERE EITHER USE THE THREAD if you do not want the code to be stuck
        self.pub_thread = threading.Thread(target=self.publisher)
        # start separate entity publishing data
        self.pub_thread.start()
        # SPIN SINCE WE DO NOT NEED TO DO ANYTHING 
        rospy.spin()

    def publisher(self):
        while not rospy.is_shutdown():
            self.give_force()
            self.rate.sleep()

    def give_force(self):
        self.force = random.choice([-2,2])  # Changes force every time, right before publishing
        self.talker.publish(self.force)

    def joint_callback(self, data):
        self.joint = data.position[0]
        reward = None
        if self.observation_check() is True:
            reward = (time.time() - start_time)
        else:
            pass

        print(reward)

    def observation_check(self):
        max_height = -3.2
        if self.joint < max_height:
            done = True
        else:
            done = False
        return done


if __name__ == "__main__":
    # Instantiate the class
    my_robot = RobotControl()

Hello,

In the class constructor there is a while statement blocking the remaining code execution which is not what you want, I believe. So even if the sintax in your robot_arm_system_test was correct on your code, the functions would not execute.

I believe that what you want is to setup a simple publish subscriber in the same class. To do so, here, and to not only do it once, so you should can create a thread handling the publisher, and place the in the end of your python main function. publisher. However, if your class is driven only by a single publisher and subscriber you can keep your constructor as is (while ros is ok, publish).

Here is an example of how you could do it (WITH THREAD and rospy.spin if nothing is to be done after it):

#!/usr/bin/env python3

import rospy
from std_msgs.msg import Float64
import math
import random
from sensor_msgs.msg import JointState
import numpy as np
import time
import threading

start_time = time.time()


class RobotControl:

    def __init__(self):
        rospy.init_node('robot_control_node', anonymous=True)
        self.talker = rospy.Publisher('/rrbot/joint1_position_controller/command', Float64, queue_size=10)
        rospy.Subscriber('/rrbot/joint_states', JointState, self.joint_callback)
        self.force = None
        self.joint = None
        self.rate = rospy.Rate(10)
        #HERE EITHER USE THE THREAD if you do not want the code to be stuck
        self.pub_thread = threading.Thread(target=self.publisher)
        # start separate entity publishing data
        self.pub_thread.start()
        # SPIN SINCE WE DO NOT NEED TO DO ANYTHING 
        rospy.spin()

    def publisher(self):
        while not rospy.is_shutdown():
            self.give_force()
            self.rate.sleep()

    def give_force(self):
        self.force = random.choice([-2,2])  # Changes force every time, right before publishing
        self.talker.publish(self.force)

    def joint_callback(self, data):
        self.joint = data.position[0]
        reward = None
        if self.observation_check() is True:
            reward = (time.time() - start_time)
        else:
            pass

        print(reward)

    def observation_check(self):
        max_height = -3.2
        if self.joint < max_height:
            done = True
        else:
            done = False
        return done


if __name__ == "__main__":
    # Instantiate the class
    my_robot = RobotControl()