ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange |
1 | initial version |
Hello,
In the class constructor there is a while statement blocking the remaining code execution which is not what you want, I believe. So even if the sintax in your robot_arm_system_test was correct on your code, the functions would not execute.
I believe that what you want is to setup a simple publish subscriber in the same class. To do so, here, and to not only do it once, you should create a thread handling the publisher, and place the in the end of your python main function. However, if your class is driven only by a single publisher and subscriber you can keep your constructor as is (while ros is ok, publish).
Here is an example of how you could do it (WITH THREAD and rospy.spin if nothing is to be done after it):
#!/usr/bin/env python3
import rospy
from std_msgs.msg import Float64
import math
import random
from sensor_msgs.msg import JointState
import numpy as np
import time
import threading
start_time = time.time()
class RobotControl:
def __init__(self):
rospy.init_node('robot_control_node', anonymous=True)
self.talker = rospy.Publisher('/rrbot/joint1_position_controller/command', Float64, queue_size=10)
rospy.Subscriber('/rrbot/joint_states', JointState, self.joint_callback)
self.force = None
self.joint = None
self.rate = rospy.Rate(10)
#HERE EITHER USE THE THREAD if you do not want the code to be stuck
self.pub_thread = threading.Thread(target=self.publisher)
# start separate entity publishing data
self.pub_thread.start()
# SPIN SINCE WE DO NOT NEED TO DO ANYTHING
rospy.spin()
def publisher(self):
while not rospy.is_shutdown():
self.give_force()
self.rate.sleep()
def give_force(self):
self.force = random.choice([-2,2]) # Changes force every time, right before publishing
self.talker.publish(self.force)
def joint_callback(self, data):
self.joint = data.position[0]
reward = None
if self.observation_check() is True:
reward = (time.time() - start_time)
else:
pass
print(reward)
def observation_check(self):
max_height = -3.2
if self.joint < max_height:
done = True
else:
done = False
return done
if __name__ == "__main__":
# Instantiate the class
my_robot = RobotControl()
2 | No.2 Revision |
Hello,
In the class constructor there is a while statement blocking the remaining code execution which is not what you want, I believe. So even if the sintax in your robot_arm_system_test was correct on your code, the functions would not execute.
I believe that what you want is to setup a simple publish subscriber in the same class. To do so, here, and to not only do it once, so you should can create a thread handling the publisher, and place the in the end of your python main function. publisher. However, if your class is driven only by a single publisher and subscriber you can keep your constructor as is (while ros is ok, publish).
Here is an example of how you could do it (WITH THREAD and rospy.spin if nothing is to be done after it):
#!/usr/bin/env python3
import rospy
from std_msgs.msg import Float64
import math
import random
from sensor_msgs.msg import JointState
import numpy as np
import time
import threading
start_time = time.time()
class RobotControl:
def __init__(self):
rospy.init_node('robot_control_node', anonymous=True)
self.talker = rospy.Publisher('/rrbot/joint1_position_controller/command', Float64, queue_size=10)
rospy.Subscriber('/rrbot/joint_states', JointState, self.joint_callback)
self.force = None
self.joint = None
self.rate = rospy.Rate(10)
#HERE EITHER USE THE THREAD if you do not want the code to be stuck
self.pub_thread = threading.Thread(target=self.publisher)
# start separate entity publishing data
self.pub_thread.start()
# SPIN SINCE WE DO NOT NEED TO DO ANYTHING
rospy.spin()
def publisher(self):
while not rospy.is_shutdown():
self.give_force()
self.rate.sleep()
def give_force(self):
self.force = random.choice([-2,2]) # Changes force every time, right before publishing
self.talker.publish(self.force)
def joint_callback(self, data):
self.joint = data.position[0]
reward = None
if self.observation_check() is True:
reward = (time.time() - start_time)
else:
pass
print(reward)
def observation_check(self):
max_height = -3.2
if self.joint < max_height:
done = True
else:
done = False
return done
if __name__ == "__main__":
# Instantiate the class
my_robot = RobotControl()