diff --git a/angler_bringup/launch/bluerov2_heavy_alpha.launch.py b/angler_bringup/launch/bluerov2_heavy_alpha.launch.py
index 2c34a66..55843d6 100644
--- a/angler_bringup/launch/bluerov2_heavy_alpha.launch.py
+++ b/angler_bringup/launch/bluerov2_heavy_alpha.launch.py
@@ -211,9 +211,9 @@ def generate_launch_description() -> LaunchDescription:
),
DeclareLaunchArgument(
"whole_body_controller",
- default_value="tpik_controller",
+ default_value="tpik_joint_trajectory_controller",
description="The whole-body controller to load.",
- choices=["tpik_controller"],
+ choices=["tpik_joint_trajectory_controller"],
),
]
diff --git a/angler_bringup/package.xml b/angler_bringup/package.xml
index 59a020d..48e57f8 100644
--- a/angler_bringup/package.xml
+++ b/angler_bringup/package.xml
@@ -14,17 +14,17 @@
Evan Palmer
- mavros
- mavros_extras
-
ament_copyright
ament_flake8
ament_pep257
python3-pytest
+ mavros
+ mavros_extras
hardware_interface
velocity_controllers
xacro
+ foxglove_bridge
ament_cmake
diff --git a/angler_control/controllers/controller.py b/angler_control/controllers/controller.py
index 9933565..8478134 100644
--- a/angler_control/controllers/controller.py
+++ b/angler_control/controllers/controller.py
@@ -140,14 +140,14 @@ def arm_controller_cb(
response.message = (
"Failed to "
+ ("arm" if request.data else "disarm")
- + "the Angler controller!"
+ + " the Angler controller!"
)
else:
response.success = True
response.message = (
"Successfully "
+ ("armed" if self.armed else "disarmed")
- + "the Angler controller!"
+ + " the Angler controller!"
)
return response
diff --git a/angler_control/controllers/robot_trajectory_controller/base_multidof_joint_trajectory_controller.py b/angler_control/controllers/robot_trajectory_controller/base_multidof_joint_trajectory_controller.py
new file mode 100644
index 0000000..b7cd8ff
--- /dev/null
+++ b/angler_control/controllers/robot_trajectory_controller/base_multidof_joint_trajectory_controller.py
@@ -0,0 +1,307 @@
+# Copyright 2023, Evan Palmer
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+import threading
+from abc import abstractmethod
+
+import controllers.robot_trajectory_controller.utils as controller_utils
+import numpy as np
+from control_msgs.action import FollowJointTrajectory
+from controllers.controller import BaseController
+from controllers.robot_trajectory_controller.trajectory import MultiDOFTrajectory
+from geometry_msgs.msg import Quaternion
+from rclpy.action import ActionServer, GoalResponse
+from rclpy.action.server import ServerGoalHandle
+from rclpy.time import Duration
+from trajectory_msgs.msg import MultiDOFJointTrajectoryPoint
+
+
+class BaseMultiDOFJointTrajectoryController(BaseController):
+ """Base interface for a MultiDOFJointTrajectory controller."""
+
+ def __init__(self, controller_name: str) -> None:
+ """Create a new multi-DOF joint trajectory controller.
+
+ Args:
+ controller_name: The unique controller name.
+ """
+ super().__init__(controller_name)
+
+ self.declare_parameters(
+ namespace="",
+ parameters=[ # type: ignore
+ ("goal_position_tolerance", 0.2),
+ ("goal_orientation_tolerance", 0.1),
+ ("goal_linear_velocity_tolerance", np.inf),
+ ("goal_angular_velocity_tolerance", np.inf),
+ ("goal_time_tolerance", 10.0),
+ ],
+ )
+
+ # Keep track of whether or not a goal is running/preempted to enable transition
+ # between preempted goals
+ self.current_uid = 0
+ self.execute_trajectory_lock = threading.Lock()
+ self._running = False
+
+ # Store a command for the child to execute
+ self.command: MultiDOFJointTrajectoryPoint | None = None
+
+ # Get the tolerances for the controller
+ self.goal_position_tolerance = (
+ self.get_parameter("goal_position_tolerance")
+ .get_parameter_value()
+ .double_value
+ )
+ self.goal_orientation_tolerance = (
+ self.get_parameter("goal_orientation_tolerance")
+ .get_parameter_value()
+ .double_value
+ )
+ self.goal_linear_velocity_tolerance = (
+ self.get_parameter("goal_linear_velocity_tolerance")
+ .get_parameter_value()
+ .double_value
+ )
+ self.goal_angular_velocity_tolerance = (
+ self.get_parameter("goal_angular_velocity_tolerance")
+ .get_parameter_value()
+ .double_value
+ )
+ self.goal_time_tolerance = Duration(
+ seconds=self.get_parameter("goal_time_tolerance") # type: ignore
+ .get_parameter_value()
+ .double_value
+ )
+
+ self.execute_trajectory_client = ActionServer(
+ self,
+ FollowJointTrajectory,
+ f"/angler/{controller_name}/execute_trajectory",
+ goal_callback=self.handle_action_request_cb,
+ execute_callback=self.execute_trajectory_cb,
+ )
+
+ def destroy_node(self) -> None:
+ """Shutdown the node and all clients."""
+ self.execute_trajectory_client.destroy()
+ return super().destroy_node()
+
+ @property
+ @abstractmethod
+ def joint_state(self) -> MultiDOFJointTrajectoryPoint:
+ """Get the current joint state to control.
+
+ Returns:
+ The joint state.
+ """
+ ...
+
+ def handle_action_request_cb(self, goal_handle: ServerGoalHandle) -> GoalResponse:
+ """Accept or reject the action request.
+
+ Args:
+ goal_handle: The action goal.
+
+ Returns:
+ Whether or not the goal has been accepted.
+ """
+ if not self.armed:
+ return GoalResponse.REJECT
+
+ return GoalResponse.ACCEPT
+
+ def check_joint_at_goal(
+ self,
+ current_state: MultiDOFJointTrajectoryPoint,
+ desired_state: MultiDOFJointTrajectoryPoint,
+ index: int,
+ ) -> bool:
+ """Check whether or not the joint is at its goal state.
+
+ Args:
+ current_state: The current joint state.
+ desired_state: The desired joint state.
+ index: The joint index.
+
+ Returns:
+ Whether or not the joint is at its goal.
+ """
+ current_pos = controller_utils.convert_tf_to_array(
+ current_state.transforms[index] # type: ignore
+ )
+ desired_pos = controller_utils.convert_tf_to_array(
+ desired_state.transforms[index] # type: ignore
+ )
+
+ position_at_goal = (
+ np.linalg.norm(desired_pos[:3] - current_pos[:3])
+ < self.goal_position_tolerance
+ )
+
+ def quat_to_array(quat: Quaternion):
+ return np.array([quat.x, quat.y, quat.z, quat.w])
+
+ current_rot = quat_to_array(
+ current_state.transforms[index].rotation # type: ignore
+ )
+ desired_rot = quat_to_array(
+ desired_state.transforms[index].rotation # type: ignore
+ )
+
+ error_eps = (
+ current_rot[3] * desired_rot[:3]
+ - desired_rot[3] * current_rot[:3]
+ + np.cross(current_rot[:3], desired_rot[:3])
+ )
+
+ rotation_at_goal = (
+ np.linalg.norm(np.hstack((error_eps))) < self.goal_orientation_tolerance
+ )
+
+ if (
+ self.trajectory.trajectory.points[0].velocities # type: ignore
+ and self.trajectory.trajectory.points[-1].velocities # type: ignore
+ ):
+ current_vel = controller_utils.convert_twist_to_array(
+ current_state.velocities[index] # type: ignore
+ )
+ desired_vel = controller_utils.convert_twist_to_array(
+ desired_state.velocities[index] # type: ignore
+ )
+
+ vel_error = np.linalg.norm(desired_vel - current_vel) # type: ignore
+
+ vel_at_goal = (
+ vel_error[:3] < self.goal_linear_velocity_tolerance
+ and vel_error[3:] < self.goal_angular_velocity_tolerance
+ )
+
+ return position_at_goal and rotation_at_goal and vel_at_goal
+
+ return position_at_goal and rotation_at_goal
+
+ def on_update(self) -> None:
+ """Return if no goal has been received."""
+ if not self._running:
+ return
+
+ sample = self.trajectory.sample(self.get_clock().now())
+
+ if sample is not None:
+ self.command = sample
+ else:
+ self.get_logger().info(
+ f"Failed to sample trajectory at time {self.get_clock().now()}"
+ )
+
+ async def execute_trajectory_cb(
+ self, goal_handle: ServerGoalHandle
+ ) -> FollowJointTrajectory.Result:
+ """Execute a joint trajectory.
+
+ Args:
+ goal_handle: The desired trajectory to track.
+
+ Returns:
+ The result of the trajectory execution.
+ """
+ # Keep track of the current goal UID to handle preemption
+ uid = self.current_uid + 1
+ self.current_uid += 1
+
+ result = FollowJointTrajectory.Result()
+
+ rate = self.create_rate(1 / self.dt)
+
+ with self.execute_trajectory_lock:
+ # Update the new trajectory
+ self.trajectory = MultiDOFTrajectory(
+ goal_handle.request.multi_dof_trajectory,
+ self.joint_state,
+ self.get_clock().now(),
+ )
+ self._running = True
+
+ while not all(
+ [
+ self.check_joint_at_goal(
+ self.joint_state,
+ self.trajectory.trajectory.points[-1], # type: ignore
+ i,
+ )
+ for i in range(len(self.joint_state.transforms))
+ ]
+ ):
+ # The goal has been flagged as inactive
+ if not goal_handle.is_active:
+ self._running = False
+ result.error_code = FollowJointTrajectory.Result.INVALID_GOAL
+ result.error_string = "Goal aborted!"
+ goal_handle.abort()
+ return result
+
+ end_goal_time = controller_utils.add_ros_time_duration_msg(
+ self.trajectory.starting_time,
+ self.trajectory.trajectory.points[-1].time_from_start, # type: ignore # noqa
+ )
+
+ # Failed to get to the goal waypoint in the tolerance time
+ if self.get_clock().now() - end_goal_time > self.goal_time_tolerance:
+ self._running = False
+ result.error_code = (
+ FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED
+ )
+ result.error_string = (
+ "Failed to complete the goal within the provided time"
+ f" tolerance {self.goal_time_tolerance}"
+ )
+ goal_handle.abort()
+ return result
+
+ # Goal has been cancelled
+ if goal_handle.is_cancel_requested:
+ self._running = False
+ result.error_code = FollowJointTrajectory.Result.INVALID_GOAL
+ result.error_string = (
+ "The current action was cancelled by a client!"
+ )
+ goal_handle.canceled()
+ return result
+
+ # Goal has been preempted by another goal
+ if uid != self.current_uid:
+ self._running = False
+ result.error_code = FollowJointTrajectory.Result.INVALID_GOAL
+ result.error_string = (
+ "The current action was cancelled by a new incoming action"
+ )
+ goal_handle.abort()
+ return result
+
+ rate.sleep()
+
+ # The goal was reached within all tolerances
+ self._running = False
+ result.error_code = FollowJointTrajectory.Result.SUCCESSFUL
+ result.error_string = "Successfully executed the provided trajectory"
+ goal_handle.succeed()
+
+ return result
diff --git a/angler_control/controllers/robot_trajectory_controller/multidof_joint_trajectory_controller.py b/angler_control/controllers/robot_trajectory_controller/multidof_joint_trajectory_controller.py
deleted file mode 100644
index 7582222..0000000
--- a/angler_control/controllers/robot_trajectory_controller/multidof_joint_trajectory_controller.py
+++ /dev/null
@@ -1,171 +0,0 @@
-# Copyright 2023, Evan Palmer
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
-# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-from abc import abstractmethod
-
-import rclpy
-from control_msgs.action import FollowJointTrajectory
-from controllers.controller import BaseController
-from controllers.robot_trajectory_controller.trajectory import MultiDOFTrajectory
-from rclpy.action import ActionServer, GoalResponse
-from rclpy.timer import Timer
-
-
-class MultiDOFJointTrajectoryController(BaseController):
- """Base interface for a MultiDOFJointTrajectory controller."""
-
- def __init__(self, controller_name: str) -> None:
- """Create a new multi-DOF joint trajectory controller.
-
- Args:
- controller_name: The unique controller name.
- """
- super().__init__(controller_name)
-
- self.declare_parameter("goal_tolerance", 0.2)
-
- self.goal_handle: FollowJointTrajectory.Goal | None = None
- self.goal_result: FollowJointTrajectory.Result | None = None
- self._preempted = False
- self._running = False
- self._timer: Timer | None = None
- self.goal_tolerance = (
- self.get_parameter("goal_tolerance").get_parameter_value().double_value
- )
-
- self.execute_trajectory_client = ActionServer(
- self,
- FollowJointTrajectory,
- f"/angler/{controller_name}/execute_trajectory",
- goal_callback=self.handle_action_request_cb,
- execute_callback=self.execute_trajectory_cb,
- )
-
- def handle_action_request_cb(
- self, goal: FollowJointTrajectory.Goal
- ) -> GoalResponse:
- """Accept or reject the action request.
-
- Args:
- goal: The action goal.
-
- Returns:
- Whether or not the goal has been accepted.
- """
- if not self.armed:
- return GoalResponse.REJECT
-
- if self.goal_handle is not None:
- self._preempted = True
- return GoalResponse.ACCEPT
-
- return GoalResponse.ACCEPT
-
- @abstractmethod
- def on_update(self) -> None:
- """Return if no goal has been received."""
- if self.goal_handle is None:
- return
-
- # TODO(evan): execute the trajectory
-
- async def execute_trajectory_cb(
- self, goal: FollowJointTrajectory.Goal
- ) -> FollowJointTrajectory.Result:
- """Execute a trajectory.
-
- Args:
- goal: The goal with the desired trajectory to execute.
-
- Raises:
- RuntimeError: Unhandled state.
-
- Returns:
- The trajectory execution result.
- """
- # Wait for any previous goals to be cleaned up
- timer = self.create_rate(self.dt)
- while self._running and rclpy.ok():
- self.get_logger().info(
- "Waiting for previous joint trajectory goal to exit!"
- )
- timer.sleep()
-
- # Clean things up after the previous action
- self._preempted = False
- self._running = True
- self.goal_handle = goal
- self.goal_result = None
- self.trajectory = MultiDOFTrajectory(
- goal.multi_dof_trajectory,
- self.state.multi_dof_joint_state,
- self.get_clock().now(),
- )
-
- # Spin until we get a response from the controller or the goal is preempted/
- # canceled
- while (
- rclpy.ok()
- and not self._preempted
- and self._running
- and self.goal_result is None
- and not self.goal_handle.is_cancel_requested # type: ignore
- ):
- timer.sleep()
-
- self._running = False
-
- if self._preempted:
- return self.preempt_current_goal()
- elif self.goal_handle.is_cancel_requested: # type: ignore
- return self.cancel_current_goal()
- elif self.goal_result is not None:
- return self.goal_result
-
- # If we get here, something bad happened
- raise RuntimeError(
- f"An error occurred while attempting to execute the goal {goal}"
- )
-
- def preempt_current_goal(self) -> FollowJointTrajectory.Result:
- """Preempt the current goal with a new goal.
-
- Returns:
- The result of the goal that has been preempted.
- """
- result = FollowJointTrajectory.Result()
- result.error_code = FollowJointTrajectory.Result.INVALID_GOAL
- result.error_string = (
- "The current action was cancelled by a new incoming action"
- )
- self.goal_handle.canceled() # type: ignore
- return result
-
- def cancel_current_goal(self) -> FollowJointTrajectory.Result:
- """Cancel the current goal.
-
- Returns:
- The result of the goal that has been cancelled.
- """
- result = FollowJointTrajectory.Result()
- result.error_code = FollowJointTrajectory.Result.INVALID_GOAL
- result.error_string = "The current action was cancelled by a client!"
- self.goal_handle.canceled() # type: ignore
- return result
diff --git a/angler_control/controllers/robot_trajectory_controller/trajectory.py b/angler_control/controllers/robot_trajectory_controller/trajectory.py
index 1453b25..1adf8be 100644
--- a/angler_control/controllers/robot_trajectory_controller/trajectory.py
+++ b/angler_control/controllers/robot_trajectory_controller/trajectory.py
@@ -18,12 +18,12 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
+import controllers.robot_trajectory_controller.utils as controller_utils
import numpy as np
from geometry_msgs.msg import Transform, Twist
from rclpy.time import Duration, Time
from scipy.interpolate import CubicHermiteSpline, interp1d
from scipy.spatial.transform import Rotation as R
-from sensor_msgs.msg import MultiDOFJointState
from trajectory_msgs.msg import MultiDOFJointTrajectory, MultiDOFJointTrajectoryPoint
@@ -38,7 +38,7 @@ class MultiDOFTrajectory:
def __init__(
self,
trajectory: MultiDOFJointTrajectory,
- current_state: MultiDOFJointState,
+ current_state: MultiDOFJointTrajectoryPoint,
current_time: Time,
) -> None:
"""Create a new MultiDOFJointTrajectory interface.
@@ -50,14 +50,7 @@ def __init__(
current_time: The current ROS timestamp.
"""
self.trajectory = trajectory
-
- # Convert the current state into a MultiDOFJointTrajectoryPoint for consistency
- # Note that there is no velocity component in the MultiDOFJointState message
- self.starting_state = MultiDOFJointTrajectoryPoint()
-
- self.starting_state.transforms = current_state.transforms
- self.starting_state.velocities = current_state.twist
-
+ self.starting_state = current_state
self.starting_time = current_time
def sample(self, t: Time) -> MultiDOFJointTrajectoryPoint | None:
@@ -77,7 +70,9 @@ def sample(self, t: Time) -> MultiDOFJointTrajectoryPoint | None:
return None
first_point = self.trajectory.points[0] # type: ignore
- first_point_timestamp = first_point.time_from_start + self.starting_time
+ first_point_timestamp = controller_utils.add_ros_time_duration_msg(
+ self.starting_time, first_point.time_from_start
+ )
if t < first_point_timestamp:
return self.interpolate(
@@ -92,14 +87,19 @@ def sample(self, t: Time) -> MultiDOFJointTrajectoryPoint | None:
point = self.trajectory.points[i] # type: ignore
next_point = self.trajectory.points[i + 1] # type: ignore
- t0 = self.starting_time + point.time_from_start
- t1 = self.starting_time + next_point.time_from_start
+ t0 = controller_utils.add_ros_time_duration_msg(
+ self.starting_time, point.time_from_start
+ )
+ t1 = controller_utils.add_ros_time_duration_msg(
+ self.starting_time, next_point.time_from_start
+ )
if t0 <= t < t1:
return self.interpolate(point, next_point, t0, t1, t)
- # Return None by default because we didn't find a point in the trajectory
- return None
+ # We are sampling past the trajectory now; return the last point in the
+ # trajectory
+ return self.trajectory.points[-1] # type: ignore
def interpolate(
self,
@@ -115,6 +115,9 @@ def interpolate(
velocities. Interpolation between points with accelerations is not yet
supported.
+ For position-only interfaces, linear interpolation is used. For position +
+ velocity interfaces, cubic Hermite spline interpolation is used.
+
Args:
start_point: The segment starting point.
end_point: The segment end point.
@@ -123,7 +126,7 @@ def interpolate(
sample_time: The timestamp at which to sample.
Returns:
- _description_
+ The identified point at the provided sample time.
"""
duration_from_start: Duration = sample_time - start_time
duration_between_points: Duration = end_time - start_time
@@ -141,30 +144,6 @@ def interpolate(
result = MultiDOFJointTrajectoryPoint()
result.time_from_start.nanosec = sample_time
- # Create a few helper functions to clean things up
- def convert_tf_to_array(tf: Transform) -> np.ndarray:
- return np.array(
- [
- tf.translation.x,
- tf.translation.y,
- tf.translation.z,
- *R.from_quat(
- [tf.rotation.x, tf.rotation.y, tf.rotation.z, tf.rotation.w]
- ).as_euler("xyz"),
- ]
- )
-
- def convert_twist_to_array(twist: Twist) -> np.ndarray:
- return np.array(
- [
- twist.linear.x,
- twist.linear.y,
- twist.linear.z,
- twist.angular.x,
- twist.angular.z,
- ]
- )
-
if not has_velocity:
# Perform linear interpolation between points
for i in range(len(start_point.transforms)):
@@ -173,8 +152,11 @@ def convert_twist_to_array(twist: Twist) -> np.ndarray:
end_tf: Transform = end_point.transforms[i] # type: ignore
tfs = np.array(
- [convert_tf_to_array(start_tf), convert_tf_to_array(end_tf)]
- )
+ [
+ controller_utils.convert_tf_to_array(start_tf),
+ controller_utils.convert_tf_to_array(end_tf),
+ ]
+ ).T
t = np.array([start_time.nanoseconds, end_time.nanoseconds])
# Perform linear interpolation
@@ -209,10 +191,16 @@ def convert_twist_to_array(twist: Twist) -> np.ndarray:
end_vel = end_point.velocities[i] # type: ignore
tfs = np.array(
- [convert_tf_to_array(start_tf), convert_tf_to_array(end_tf)]
+ [
+ controller_utils.convert_tf_to_array(start_tf),
+ controller_utils.convert_tf_to_array(end_tf),
+ ]
)
vels = np.array(
- [convert_twist_to_array(start_vel), convert_twist_to_array(end_vel)]
+ [
+ controller_utils.convert_twist_to_array(start_vel),
+ controller_utils.convert_twist_to_array(end_vel),
+ ]
)
t = np.array([start_time.nanoseconds, end_time.nanoseconds])
diff --git a/angler_control/controllers/robot_trajectory_controller/utils.py b/angler_control/controllers/robot_trajectory_controller/utils.py
new file mode 100644
index 0000000..1dc4518
--- /dev/null
+++ b/angler_control/controllers/robot_trajectory_controller/utils.py
@@ -0,0 +1,82 @@
+# Copyright 2023, Evan Palmer
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+import numpy as np
+from builtin_interfaces.msg import Duration as DurationMsg
+from geometry_msgs.msg import Transform, Twist
+from rclpy.time import Time
+from scipy.spatial.transform import Rotation as R
+
+
+def add_ros_time_duration_msg(time: Time, duration: DurationMsg) -> Time:
+ """Add a ROS Time and a Duration message together to create a new Time.
+
+ Args:
+ time: The Time addend.
+ duration: The Duration message addend.
+
+ Returns:
+ The resulting sum as a new Time object.
+ """
+ duration_nanoseconds = duration.sec * 10**9 + duration.nanosec
+ return Time(
+ nanoseconds=time.nanoseconds + duration_nanoseconds, clock_type=time.clock_type
+ )
+
+
+def convert_tf_to_array(tf: Transform) -> np.ndarray:
+ """Convert a Transform message to a numpy array.
+
+ Args:
+ tf: The transform to convert.
+
+ Returns:
+ The resulting numpy array.
+ """
+ return np.array(
+ [
+ tf.translation.x,
+ tf.translation.y,
+ tf.translation.z,
+ *R.from_quat(
+ [tf.rotation.x, tf.rotation.y, tf.rotation.z, tf.rotation.w]
+ ).as_euler("xyz"),
+ ]
+ )
+
+
+def convert_twist_to_array(twist: Twist) -> np.ndarray:
+ """Convert a Twist message to a numpy array.
+
+ Args:
+ twist: The twist to convert.
+
+ Returns:
+ The resulting numpy array.
+ """
+ return np.array(
+ [
+ twist.linear.x,
+ twist.linear.y,
+ twist.linear.z,
+ twist.angular.x,
+ twist.angular.z,
+ ]
+ )
diff --git a/angler_control/controllers/tpik_controller/__init__.py b/angler_control/controllers/tpik_joint_trajectory_controller/__init__.py
similarity index 100%
rename from angler_control/controllers/tpik_controller/__init__.py
rename to angler_control/controllers/tpik_joint_trajectory_controller/__init__.py
diff --git a/angler_control/controllers/tpik_controller/constraints.py b/angler_control/controllers/tpik_joint_trajectory_controller/constraints.py
similarity index 100%
rename from angler_control/controllers/tpik_controller/constraints.py
rename to angler_control/controllers/tpik_joint_trajectory_controller/constraints.py
diff --git a/angler_control/controllers/tpik_controller/hierarchy.py b/angler_control/controllers/tpik_joint_trajectory_controller/hierarchy.py
similarity index 97%
rename from angler_control/controllers/tpik_controller/hierarchy.py
rename to angler_control/controllers/tpik_joint_trajectory_controller/hierarchy.py
index d820b8f..c3f12e8 100644
--- a/angler_control/controllers/tpik_controller/hierarchy.py
+++ b/angler_control/controllers/tpik_joint_trajectory_controller/hierarchy.py
@@ -24,12 +24,12 @@
import numpy as np
import yaml # type: ignore
-from controllers.tpik_controller.constraints import (
+from controllers.tpik_joint_trajectory_controller.constraints import (
Constraint,
EqualityConstraint,
SetConstraint,
)
-from controllers.tpik_controller.tasks import task_library
+from controllers.tpik_joint_trajectory_controller.tasks import task_library
class TaskHierarchy:
diff --git a/angler_control/controllers/tpik_controller/tasks.py b/angler_control/controllers/tpik_joint_trajectory_controller/tasks.py
similarity index 95%
rename from angler_control/controllers/tpik_controller/tasks.py
rename to angler_control/controllers/tpik_joint_trajectory_controller/tasks.py
index 32f2515..6ccfcc4 100644
--- a/angler_control/controllers/tpik_controller/tasks.py
+++ b/angler_control/controllers/tpik_joint_trajectory_controller/tasks.py
@@ -21,7 +21,10 @@
from typing import Any
import numpy as np
-from controllers.tpik_controller.constraints import EqualityConstraint, SetConstraint
+from controllers.tpik_joint_trajectory_controller.constraints import (
+ EqualityConstraint,
+ SetConstraint,
+)
from geometry_msgs.msg import Quaternion, Transform
from scipy.spatial.transform import Rotation as R
@@ -281,7 +284,7 @@ class EndEffectorPoseTask(EqualityConstraint):
name = "end_effector_pose_eq"
def __init__(self, gain: float, priority: float) -> None:
- """Create a new end effector pose task.
+ """Create a new end-effector pose task.
Args:
gain: The task gain.
@@ -308,26 +311,26 @@ def create_task_from_params(
pitch: float | None = None,
yaw: float | None = None,
) -> Any:
- """Create a new end effector pose task from a set of parameters.
+ """Create a new end-effector pose task from a set of parameters.
Args:
gain: The task gain.
priority: The task priority.
- x: The desired end effector x position in the inertial (map) frame. Defaults
+ x: The desired end-effector x position in the inertial (map) frame. Defaults
to None.
- y: The desired end effector y position in the inertial (map) frame. Defaults
+ y: The desired end-effector y position in the inertial (map) frame. Defaults
to None.
- z: The desired end effector z position in the inertial (map) frame. Defaults
+ z: The desired end-effector z position in the inertial (map) frame. Defaults
to None.
- roll: The desired end effector roll in the inertial (map) frame. Defaults
+ roll: The desired end-effector roll in the inertial (map) frame. Defaults
to None.
- pitch: The desired end effector pitch in the inertial (map) frame. Defaults
+ pitch: The desired end-effector pitch in the inertial (map) frame. Defaults
to None.
- yaw: The desired end effector yaw in the inertial (map) frame. Defaults
+ yaw: The desired end-effector yaw in the inertial (map) frame. Defaults
to None.
Returns:
- A new end effector pose task.
+ A new end-effector pose task.
"""
task = EndEffectorPoseTask(gain, priority)
@@ -357,21 +360,21 @@ def update(
serial_chain: Any | None = None,
desired_pose: Transform | None = None,
) -> None:
- """Update the current context of the end effector pose task.
+ """Update the current context of the end-effector pose task.
Args:
joint_angles: The current manipulator joint angles.
current_pose: The current vehicle pose in the inertial (map) frame.
tf_map_to_ee: The transformation from the inertial (map) frame to the
- end effector frame.
+ end-effector frame.
tf_map_to_base: The transformation from the inertial (map) frame to the
vehicle base frame.
tf_base_to_manipulator_base: The transformation from the vehicle base frame
to the manipulator base frame.
tf_manipulator_base_to_ee: The transformation from the manipulator base
- frame to the end effector frame.
+ frame to the end-effector frame.
serial_chain: The manipulator kinpy serial chain. Defaults to None.
- desired_pose: The desired end effector pose. Defaults to None.
+ desired_pose: The desired end-effector pose. Defaults to None.
"""
self.joint_angles = joint_angles
self.current_value = tf_map_to_ee
@@ -406,7 +409,7 @@ def error(self) -> np.ndarray:
"""Calculate the reference signal for the controller.
Returns:
- The reference signal to use to drive the system to the desired end effector
+ The reference signal to use to drive the system to the desired end-effector
pose.
"""
pos_error = np.array(
diff --git a/angler_control/controllers/tpik_controller/tpik_controller.py b/angler_control/controllers/tpik_joint_trajectory_controller/tpik_controller.py
similarity index 87%
rename from angler_control/controllers/tpik_controller/tpik_controller.py
rename to angler_control/controllers/tpik_joint_trajectory_controller/tpik_controller.py
index 18b0b1d..e6f45b2 100644
--- a/angler_control/controllers/tpik_controller/tpik_controller.py
+++ b/angler_control/controllers/tpik_joint_trajectory_controller/tpik_controller.py
@@ -21,10 +21,15 @@
import kinpy
import numpy as np
import rclpy
-from controllers.controller import BaseController
-from controllers.tpik_controller.constraints import EqualityConstraint, SetConstraint
-from controllers.tpik_controller.hierarchy import TaskHierarchy
-from controllers.tpik_controller.tasks import (
+from controllers.robot_trajectory_controller.base_multidof_joint_trajectory_controller import ( # noqa
+ BaseMultiDOFJointTrajectoryController,
+)
+from controllers.tpik_joint_trajectory_controller.constraints import (
+ EqualityConstraint,
+ SetConstraint,
+)
+from controllers.tpik_joint_trajectory_controller.hierarchy import TaskHierarchy
+from controllers.tpik_joint_trajectory_controller.tasks import (
EndEffectorPoseTask,
ManipulatorJointConfigurationTask,
ManipulatorJointLimitTask,
@@ -77,7 +82,7 @@ def construct_augmented_jacobian(jacobians: list[np.ndarray]) -> np.ndarray:
return np.vstack(tuple(jacobians))
-class TpikController(BaseController):
+class TpikController(BaseMultiDOFJointTrajectoryController):
"""Set-based task-priority inverse kinematic (TPIK) controller.
The TPIK controller is responsible for calculating kinematically feasible system
@@ -88,13 +93,18 @@ class TpikController(BaseController):
def __init__(self) -> None:
"""Create a new TPIK node."""
- super().__init__("tpik_controller")
-
- self.declare_parameter("hierarchy_file", "")
- self.declare_parameter("inertial_frame", "map")
- self.declare_parameter("base_frame", "base_link")
- self.declare_parameter("manipulator_base_link", "alpha_base_link")
- self.declare_parameter("manipulator_end_link", "alpha_standard_jaws_tool")
+ super().__init__("tpik_joint_trajectory_controller")
+
+ self.declare_parameters(
+ namespace="",
+ parameters=[ # type: ignore
+ ("hierarchy_file", ""),
+ ("inertial_frame", "map"),
+ ("base_frame", "base_link"),
+ ("manipulator_base_link", "alpha_base_link"),
+ ("manipulator_end_link", "alpha_standard_jaws_tool"),
+ ],
+ )
# Don't run the controller until everything has been properly initialized
self._description_received = False
@@ -140,6 +150,36 @@ def __init__(self) -> None:
RobotTrajectory, "/angler/robot_trajectory", 1
)
+ @property
+ def joint_state(self) -> MultiDOFJointTrajectoryPoint:
+ """Get the end-effector pose in the inertial frame.
+
+ We control the end-effector state with this particular variation of the
+ controller.
+
+ Returns:
+ The end-effector pose in the inertial frame.
+ """
+ point = MultiDOFJointTrajectoryPoint()
+
+ # Control the end-effector pose
+ try:
+ tf_map_to_ee = self.tf_buffer.lookup_transform(
+ self.inertial_frame,
+ self.manipulator_ee_frame,
+ Time(),
+ Duration(nanoseconds=10000000), # 10 ms
+ )
+ except TransformException as e:
+ self.get_logger().warning(
+ "Failed to get the current transformation from the map to end-effector"
+ f"frame: {e}"
+ )
+ return point
+
+ point.transforms.append(tf_map_to_ee.transform) # type: ignore
+ return point
+
def on_arm(self) -> bool:
"""Make sure that the system has been initialized before enabling arming.
@@ -247,8 +287,17 @@ def calculate_system_velocity_rec(
def on_update(self) -> None:
"""Calculate the desired system velocities using set-based TPIK."""
+ # Update the joint command first
+ super().on_update()
+
hierarchies = self.hierarchy.hierarchies
+ # Set the desired trajectory value
+ for hierarchy in hierarchies:
+ for task in hierarchy:
+ if isinstance(task, EndEffectorPoseTask) and self.command is not None:
+ task.desired_value = self.command.transforms[0] # type: ignore
+
# Check whether or not the hierarchies have any set tasks
has_set_tasks = any(
[any([isinstance(y, SetConstraint) for y in x]) for x in hierarchies]
diff --git a/angler_control/launch/control.launch.py b/angler_control/launch/control.launch.py
index db4441a..24fd30d 100644
--- a/angler_control/launch/control.launch.py
+++ b/angler_control/launch/control.launch.py
@@ -49,9 +49,9 @@ def generate_launch_description() -> LaunchDescription:
),
DeclareLaunchArgument(
"controller",
- default_value="tpik_controller",
+ default_value="tpik_joint_trajectory_controller",
description="The whole-body controller to load.",
- choices=["tpik_controller"],
+ choices=["tpik_joint_trajectory_controller"],
),
]
@@ -72,7 +72,9 @@ def generate_launch_description() -> LaunchDescription:
}
],
condition=IfCondition(
- PythonExpression(["'", controller, "' == 'tpik_controller'"])
+ PythonExpression(
+ ["'", controller, "' == 'tpik_joint_trajectory_controller'"]
+ )
),
),
]
diff --git a/angler_control/setup.py b/angler_control/setup.py
index 6dc0bbc..e7ba184 100644
--- a/angler_control/setup.py
+++ b/angler_control/setup.py
@@ -43,7 +43,7 @@
tests_require=["pytest"],
entry_points={
"console_scripts": [
- "tpik_controller = controllers.tpik_controller.tpik_controller:main",
+ "tpik_joint_trajectory_controller = controllers.tpik_joint_trajectory_controller.tpik_controller:main", # noqa
],
},
)
diff --git a/angler_description/config/bluerov2_heavy_alpha/tasks.yaml b/angler_description/config/bluerov2_heavy_alpha/tasks.yaml
index 53bacea..e5fb1cb 100644
--- a/angler_description/config/bluerov2_heavy_alpha/tasks.yaml
+++ b/angler_description/config/bluerov2_heavy_alpha/tasks.yaml
@@ -18,9 +18,12 @@
gain: 0.3
joint: 6 # axis e
+# This task is required for trajectory tracking
- task: "end_effector_pose_eq"
priority: 2
gain: 0.3
+ # Set the initial position to move to
+ # when the controller is armed
x: 0.0
y: 0.0
z: -1.0
diff --git a/angler_mux/demux/single_manipulator_demux.py b/angler_mux/demux/single_manipulator_demux.py
index fcbf11e..7b7bc49 100644
--- a/angler_mux/demux/single_manipulator_demux.py
+++ b/angler_mux/demux/single_manipulator_demux.py
@@ -37,9 +37,12 @@ def __init__(self) -> None:
"""Create a new demuxer for the vehicle and a manipulator."""
super().__init__("single_manipulator_velocity_demux")
- self.declare_parameter("vehicle_control_topic", "/blue/ismc/cmd_vel")
- self.declare_parameter(
- "manipulator_control_topic", "/forward_velocity_controller/commands"
+ self.declare_parameters(
+ namespace="",
+ parameters=[ # type: ignore
+ ("vehicle_control_topic", "/blue/ismc/cmd_vel"),
+ ("manipulator_control_topic", "/forward_velocity_controller/commands"),
+ ],
)
vehicle_control_topic = (
diff --git a/angler_planning/planners/waypoint_planners/preplanned_end_effector_planner/planner.py b/angler_planning/planners/waypoint_planners/preplanned_end_effector_planner/planner.py
index fee3734..6835377 100644
--- a/angler_planning/planners/waypoint_planners/preplanned_end_effector_planner/planner.py
+++ b/angler_planning/planners/waypoint_planners/preplanned_end_effector_planner/planner.py
@@ -37,14 +37,19 @@ def __init__(self) -> None:
"""Create a new planning interface."""
super().__init__("preplanned_end_effector_waypoint_planner")
- self.declare_parameter("trajectory_name", "")
- self.declare_parameter(
- "library_path",
- os.path.join(
- get_package_share_directory("angler_planning"),
- "trajectories",
- "library",
- ),
+ self.declare_parameters(
+ namespace="",
+ parameters=[ # type: ignore
+ ("trajectory_name", ""),
+ (
+ "library_path",
+ os.path.join(
+ get_package_share_directory("angler_planning"),
+ "trajectories",
+ "library",
+ ),
+ ),
+ ],
)
trajectory_name = (
diff --git a/angler_utils/angler_utils/initial_position.py b/angler_utils/angler_utils/initial_position.py
index 9b58101..1a16210 100644
--- a/angler_utils/angler_utils/initial_position.py
+++ b/angler_utils/angler_utils/initial_position.py
@@ -42,20 +42,22 @@ def __init__(self) -> None:
"""Create a new initial position setter."""
super().__init__("initial_position_setter")
- # Parameters
- self.declare_parameter(
- "initial_positions_file",
- os.path.join(
- get_package_share_directory("angler_description"),
- "config",
- "initial_positions.yaml",
- ),
+ self.declare_parameters(
+ namespace="",
+ parameters=[ # type: ignore
+ (
+ "initial_positions_file",
+ os.path.join(
+ get_package_share_directory("angler_description"),
+ "config",
+ "initial_positions.yaml",
+ ),
+ ),
+ ("controller_cmd_topic", "/forward_velocity_controller/commands"),
+ ("position_tol", 0.1),
+ ("joint_velocity", 0.5),
+ ],
)
- self.declare_parameter(
- "controller_cmd_topic", "/forward_velocity_controller/commands"
- )
- self.declare_parameter("position_tol", 0.1)
- self.declare_parameter("joint_velocity", 0.5) # fast af boi
# Get the desired initial positions
with open(