sot-talos-balance  2.0.5
Collection of dynamic-graph entities aimed at implementing balance control on talos.
gazebo_utils.py
Go to the documentation of this file.
1 """This module contains utilities for interacting with Gazebo."""
2 
3 import threading
4 
5 import rospy
6 from gazebo_msgs.srv import ApplyBodyWrench, GetLinkState
7 from geometry_msgs.msg import Wrench
8 from tf.transformations import euler_from_quaternion
9 
10 from dynamic_graph_bridge_msgs.msg import Vector as VectorMsg
11 
12 
14  list_of_topics = rospy.get_published_topics()
15  for a_topic in list_of_topics:
16  if a_topic[0].startswith("/gazebo"):
17  return True
18  return False
19 
20 
21 def apply_force(force, duration, body_name="talos::torso_2_link"):
22  """Gazebo service call for applying a force on a body."""
23  rospy.wait_for_service("/gazebo/apply_body_wrench")
24  apply_body_wrench_proxy = rospy.ServiceProxy(
25  "/gazebo/apply_body_wrench", ApplyBodyWrench
26  )
27  wrench = Wrench()
28  wrench.force.x = force[0]
29  wrench.force.y = force[1]
30  wrench.force.z = force[2]
31  wrench.torque.x = 0
32  wrench.torque.y = 0
33  wrench.torque.z = 0
35  body_name=body_name, wrench=wrench, duration=rospy.Duration(duration)
36  )
37 
38 
39 def vec2list(v):
40  return [v.x, v.y, v.z]
41 
42 
43 def quat2list(v):
44  return [v.x, v.y, v.z, v.w]
45 
46 
47 class GazeboLinkStatePublisher(threading.Thread):
48  """Utility class reading the state of a given link from Gazebo and publishing."""
49 
50  def __init__(self, link_name, rate, euler="sxyz", local_frame=False, prefix="/sot"):
51  super(GazeboLinkStatePublisher, self).__init__(name=link_name + "_publisher")
52  self.daemon = True
53  self.link_name = link_name
54  if local_frame:
55  self.reference_frame = link_name
56  else:
57  self.reference_frame = ""
58  self.rate = rate
59  self.prefix = prefix
60  self.euler = euler
61  self._stop = False
62  rospy.init_node(self.name, anonymous=True)
63 
64  def stop(self):
65  self._stop = True
66 
67  def stopped(self):
68  return self._stop
69 
70  def run(self):
71  topic_pos = self.prefix + "/" + self.link_name + "/position"
72  topic_vel = self.prefix + "/" + self.link_name + "/velocity"
73 
74  pub_pos = rospy.Publisher(topic_pos, VectorMsg, queue_size=10)
75  pub_vel = rospy.Publisher(topic_vel, VectorMsg, queue_size=10)
76 
77  rospy.wait_for_service("/gazebo/get_link_state")
78  get_link_state = rospy.ServiceProxy("/gazebo/get_link_state", GetLinkState)
79 
80  rate = rospy.Rate(self.rate)
81  while (not self.stopped()) and (not rospy.is_shutdown()):
82  link_state_msg = get_link_state(
83  link_name=self.link_name, reference_frame=self.reference_frame
84  )
85  link_state = link_state_msg.link_state
86 
87  cartesian = vec2list(link_state.pose.position)
88  orientation = quat2list(link_state.pose.orientation)
89  if self.euler:
90  orientation = list(euler_from_quaternion(orientation, self.euler))
91  position = cartesian + orientation
92 
93  velocity = vec2list(link_state.twist.linear) + vec2list(
94  link_state.twist.angular
95  )
96  pub_pos.publish(position)
97  pub_vel.publish(velocity)
98 
99  rate.sleep()
sot_talos_balance.utils.gazebo_utils.GazeboLinkStatePublisher.stopped
def stopped(self)
Definition: gazebo_utils.py:67
sot_talos_balance.utils.gazebo_utils.GazeboLinkStatePublisher
Definition: gazebo_utils.py:47
sot_talos_balance.utils.gazebo_utils.is_gazebo_present
def is_gazebo_present()
Definition: gazebo_utils.py:13
sot_talos_balance.utils.gazebo_utils.GazeboLinkStatePublisher.prefix
prefix
Definition: gazebo_utils.py:59
sot_talos_balance.utils.gazebo_utils.apply_force
def apply_force(force, duration, body_name="talos::torso_2_link")
Definition: gazebo_utils.py:21
sot_talos_balance.utils.gazebo_utils.GazeboLinkStatePublisher.__init__
def __init__(self, link_name, rate, euler="sxyz", local_frame=False, prefix="/sot")
Definition: gazebo_utils.py:50
sot_talos_balance.utils.gazebo_utils.GazeboLinkStatePublisher.daemon
daemon
Definition: gazebo_utils.py:52
sot_talos_balance.utils.gazebo_utils.quat2list
def quat2list(v)
Definition: gazebo_utils.py:43
sot_talos_balance.test.script_test_end_effector.apply_body_wrench_proxy
apply_body_wrench_proxy
Definition: script_test_end_effector.py:6
sot_talos_balance.utils.gazebo_utils.GazeboLinkStatePublisher.reference_frame
reference_frame
Definition: gazebo_utils.py:55
sot_talos_balance.utils.gazebo_utils.GazeboLinkStatePublisher._stop
_stop
Definition: gazebo_utils.py:61
sot_talos_balance.utils.gazebo_utils.GazeboLinkStatePublisher.euler
euler
Definition: gazebo_utils.py:60
sot_talos_balance.utils.gazebo_utils.GazeboLinkStatePublisher.link_name
link_name
Definition: gazebo_utils.py:53
sot_talos_balance.utils.gazebo_utils.GazeboLinkStatePublisher.run
def run(self)
Definition: gazebo_utils.py:70
sot_talos_balance.utils.gazebo_utils.vec2list
def vec2list(v)
Definition: gazebo_utils.py:39
sot_talos_balance.utils.gazebo_utils.GazeboLinkStatePublisher.stop
def stop(self)
Definition: gazebo_utils.py:64
sot_talos_balance.utils.gazebo_utils.GazeboLinkStatePublisher.rate
rate
Definition: gazebo_utils.py:58