sot-talos-balance  2.0.5
Collection of dynamic-graph entities aimed at implementing balance control on talos.
gazebo_utils.py
Go to the documentation of this file.
1 """This module contains utilities for interacting with Gazebo."""
2 
3 import threading
4 
5 import rospy
6 from gazebo_msgs.srv import ApplyBodyWrench, GetLinkState
7 from geometry_msgs.msg import Wrench
8 from tf.transformations import euler_from_quaternion
9 
10 from dynamic_graph_bridge_msgs.msg import Vector as VectorMsg
11 
12 
14  list_of_topics = rospy.get_published_topics()
15  for a_topic in list_of_topics:
16  if a_topic[0].startswith("/gazebo"):
17  return True
18  return False
19 
20 
21 def apply_force(force, duration, body_name="talos::torso_2_link"):
22  """Gazebo service call for applying a force on a body."""
23  rospy.wait_for_service("/gazebo/apply_body_wrench")
24  apply_body_wrench_proxy = rospy.ServiceProxy(
25  "/gazebo/apply_body_wrench", ApplyBodyWrench
26  )
27  wrench = Wrench()
28  wrench.force.x = force[0]
29  wrench.force.y = force[1]
30  wrench.force.z = force[2]
31  wrench.torque.x = 0
32  wrench.torque.y = 0
33  wrench.torque.z = 0
35  body_name=body_name, wrench=wrench, duration=rospy.Duration(duration)
36  )
37 
38 
39 def vec2list(v):
40  return [v.x, v.y, v.z]
41 
42 
43 def quat2list(v):
44  return [v.x, v.y, v.z, v.w]
45 
46 
47 class GazeboLinkStatePublisher(threading.Thread):
48  """Utility class reading the state of a given link from Gazebo and publishing."""
49 
50  def __init__(self, link_name, rate, euler="sxyz", local_frame=False, prefix="/sot"):
51  super(GazeboLinkStatePublisher, self).__init__(name=link_name + "_publisher")
52  self.daemondaemon = True
53  self.link_namelink_name = link_name
54  if local_frame:
55  self.reference_framereference_frame = link_name
56  else:
57  self.reference_framereference_frame = ""
58  self.raterate = rate
59  self.prefixprefix = prefix
60  self.eulereuler = euler
61  self._stop_stop = False
62  rospy.init_node(self.name, anonymous=True)
63 
64  def stop(self):
65  self._stop_stop = True
66 
67  def stopped(self):
68  return self._stop_stop
69 
70  def run(self):
71  topic_pos = self.prefixprefix + "/" + self.link_namelink_name + "/position"
72  topic_vel = self.prefixprefix + "/" + self.link_namelink_name + "/velocity"
73 
74  pub_pos = rospy.Publisher(topic_pos, VectorMsg, queue_size=10)
75  pub_vel = rospy.Publisher(topic_vel, VectorMsg, queue_size=10)
76 
77  rospy.wait_for_service("/gazebo/get_link_state")
78  get_link_state = rospy.ServiceProxy("/gazebo/get_link_state", GetLinkState)
79 
80  rate = rospy.Rate(self.raterate)
81  while (not self.stoppedstopped()) and (not rospy.is_shutdown()):
82  link_state_msg = get_link_state(
83  link_name=self.link_namelink_name, reference_frame=self.reference_framereference_frame
84  )
85  link_state = link_state_msg.link_state
86 
87  cartesian = vec2list(link_state.pose.position)
88  orientation = quat2list(link_state.pose.orientation)
89  if self.eulereuler:
90  orientation = list(euler_from_quaternion(orientation, self.eulereuler))
91  position = cartesian + orientation
92 
93  velocity = vec2list(link_state.twist.linear) + vec2list(
94  link_state.twist.angular
95  )
96  pub_pos.publish(position)
97  pub_vel.publish(velocity)
98 
99  rate.sleep()
def __init__(self, link_name, rate, euler="sxyz", local_frame=False, prefix="/sot")
Definition: gazebo_utils.py:50
def apply_force(force, duration, body_name="talos::torso_2_link")
Definition: gazebo_utils.py:21