fet.atas
fet.atas

Reputation: 303

Confusion on deploying Drake based software to a real robot

I have limited experience in drake and currently trying to learn and implement some simple algorithms. The code I have is based on an example from the Underactuated Robotics course by R. Tedrake. The code is a slightly modified version of this example here, where the goal is to design a Lyapanuv Control Function for the unicycle robot model.

To simulate the robot model, an instance of SymbolicVectorSystem from drake is used.

As I understand this is just a building block for the "prototyping" and designing a suitable controller, is this correct?, is SymbolicVectorSystem still needed if there is a real robot?.

Now that I am happy with the controller, I would like to deploy this controller to a ROS2 robot. For that, I have written a simple script that subscribes to the current robot states from the localization system that I have. I am not really sure how to proceed from here, I think I need to connect the received robot state messages to the input of the controller, then publish the output of the controller back to the robot. The problem is I couldn't find any examples that suit this use case.

An approach I think of, could be that I create another class that inherits from VectorSystem, and this class wraps the ros stuff to subscribe publish ROS messages. In overridden DoCalcVectorOutput function of my wrapper class, I assign/retrieve the I/O and publish retrieved control commands. Finally, I connect the I/O of controller and wrapper classes with a DiagramBuilder

I am not sure if this will work, or if this is will be the recommended approach, I will appreciate your help.

Here is what the current code looks like;

#!/usr/bin/env python3

import rclpy
import rclpy.node
from geometry_msgs.msg import PoseWithCovarianceStamped
import rclpy.qos
from rclpy.executors import MultiThreadedExecutor

import pydrake
from pydrake.all import (VectorSystem, DiagramBuilder, SymbolicVectorSystem, LogVectorOutput,
                         Variable, Simulator, cos, sin)
import numpy

L = 0.6
K_us = 1.0
g = 9.81
# you will write the control law in it in the next cell
# by defining the function called "lyapunov_controller"


class PointTrackingController(VectorSystem):

    def __init__(self):
        # 3 inputs (robot state)
        # 2 outputs (robot inputs)
        VectorSystem.__init__(self, 3, 2)

    def euclidean_dist(self, x, y, x_obs, y_obs):
        # x and y are vehicle pose
        return numpy.sqrt((x-x_obs)**2) + ((y-y_obs)**2)

    def clip(self, val, min_, max_):
        return min_ if val < min_ else max_ if val > max_ else val

    def lyapunov_controller(self, f_x, f_y, theta, f_theta, theta_d):
        k1 = 1E-4
        k2 = 1E-1
        u1 = k1 * numpy.copysign(1, (f_x*cos(theta) + f_y *
                                     sin(theta))) * (f_x**2 + f_y ** 2 + f_theta**2)

        p = u1*(f_x * cos(theta) + f_y * sin(theta)) + \
            k2 * (theta_d - theta) * f_theta
        if p >= 0.0:
            u2 = k2 * (theta_d - theta) * ((L + K_us/g * u1**2)/u1)
        else:
            u2 = -u1 / f_theta * (f_x * cos(theta) + f_y *
                                  sin(theta)) * ((L + K_us/g * u1**2)/u1)

        return -self.clip(u1, -1.5, 1.5), -self.clip(u2, -0.6, 0.6)

    def DoCalcVectorOutput(
        self,
        context,           # not used
        cartesian_state,   # input of the controller
        controller_state,  # not used
        input              # output of the controller
    ):

        kLAMBDA = 0.0
        kK = 0.05
        kPULL = 10

        # upack state of the robot x,y,theta
        x, y, theta = cartesian_state

        target_x, target_y = 0, 0
        dist = self.euclidean_dist(x, y, target_x, target_y)
        beta = kPULL*dist

        if x >= 0.0:
            f_x = 3 * (x**2) * (beta**(1.0/kK)) + (y**2)*(beta**(1.0/kK)) + kLAMBDA * (theta**2) * (beta**(1.0/kK)) + \
                (x*(x - target_x)*(x**2 + y**2 + kLAMBDA*(theta**2)) * (beta **
                 ((-kK+1.0)/kK))) / kK * numpy.sqrt((x-target_x)**2 + (y - target_y)**2)

            f_y = x * (((y - target_y)*(x**2 + y**2 + kLAMBDA*(theta**2)) * (beta**((-kK+1.0)/kK))
                        ) / kK * numpy.sqrt((x-target_x)**2 + (y - target_y)**2) + 2 * y * (beta**(1.0/kK)))
            f_theta = 2 * kLAMBDA * theta * (beta**(1.0/kK))
        else:
            f_x = -3 * (x**2) * (beta**(1/kK)) - (y**2)*(beta**(1/kK)) - kLAMBDA * (theta**2) * (beta**(1.0/kK)) - \
                (x*(x - target_x)*(x**2 + y**2 + kLAMBDA*(theta**2)) * (beta **
                 ((-kK+1.0)/kK))) / kK * numpy.sqrt((x-target_x)**2 + (y - target_y)**2)

            f_y = -x * (((y - target_y)*(x**2 + y**2 + kLAMBDA*(theta**2)) * (beta**((-kK+1)/kK))) /
                        kK * numpy.sqrt((x-target_x)**2 + (y - target_y)**2) + 2 * y * (beta**(1.0/kK)))
            f_theta = - 2 * kLAMBDA * theta * (beta**(1.0/kK))

        theta_d = numpy.arctan2(-numpy.copysign(1, x)*f_y,
                                -numpy.copysign(1, x)*f_x)

        # evaluate the function below and return the robot's input
        input[:] = self.lyapunov_controller(f_x, f_y, theta, f_theta, theta_d)


class RosNode(rclpy.node.Node):
    def __init__(self, *args):
        super(RosNode, self).__init__("rosnode")
        self.get_logger().info("Starting rosnode")

        self.create_subscription(
            PoseWithCovarianceStamped, "/vox_nav/cupoch/icp_base_to_map_pose",
            self.robot_pose_callback, rclpy.qos.qos_profile_sensor_data)

    def robot_pose_callback(self, msg):
        self.get_logger().info("recieved pose %s" % str(msg.pose.pose))


def main():

    print("Drake is installed under: ")
    print(pydrake.getDrakePath())

    x = Variable("x")
    y = Variable("y")
    theta = Variable("theta")        # vehicle orienttaion
    cartesian_state = [x, y, theta]
    v = Variable("v")                # driving velocity input
    delta = Variable("delta")        # steering velocity input
    input = [v, delta]

    # nonlinear dynamics, the whole state is measured (output = state)
    dynamics = [v * cos(theta),
                v * sin(theta),
                v * numpy.tan(delta)/L]

    robot = SymbolicVectorSystem(
        state=cartesian_state,
        input=input,
        output=cartesian_state,
        dynamics=dynamics,
    )

    # construction site for our closed-loop system
    builder = DiagramBuilder()

    # add the robot to the diagram
    # the method .AddSystem() simply returns a pointer to the system
    # we passed as input, so it's ok to give it the same name
    robot = builder.AddSystem(robot)

    # add the controller
    controller = builder.AddSystem(PointTrackingController())

    # wire the controller with the system
    builder.Connect(robot.get_output_port(0), controller.get_input_port(0))
    builder.Connect(controller.get_output_port(0), robot.get_input_port(0))

    # add a logger to the diagram
    # this will store the state trajectory
    logger = LogVectorOutput(robot.get_output_port(0), builder)

    # complete the construction of the diagram
    diagram = builder.Build()

    # set up a simulation environment
    simulator = Simulator(diagram)

    # set the initial cartesian state to a random initial position
    # try initial_state = np.random.randn(3) for a random initial state
    initial_state = [10, 16, 1.57]
    context = simulator.get_mutable_context()
    context.SetContinuousState(initial_state)

    # simulate from zero to sim_time
    # the trajectory will be stored in the logger
    integrator = simulator.get_mutable_integrator()
    target_accuracy = 1E-4
    integrator.set_target_accuracy(target_accuracy)
    maximum_step_size = 0.1
    integrator.set_maximum_step_size(maximum_step_size)
    minimum_step_size = 2E-5
    integrator.set_requested_minimum_step_size(minimum_step_size)
    integrator.set_throw_on_minimum_step_size_violation(True)
    integrator.set_fixed_step_mode(True)
    simulator.set_target_realtime_rate(1.0)

    rclpy.init()
    node = RosNode()
    executor = MultiThreadedExecutor()
    executor.add_node(node)

    while True:
        executor.spin_once(0.05)
        simulator.AdvanceTo(context.get_time() + 0.1)
        node.get_logger().info("Another step, time is: %s" % str(context.get_time()))


if __name__ == '__main__':
    main()

Upvotes: 2

Views: 638

Answers (1)

Eric Cousineau
Eric Cousineau

Reputation: 2004

The code is a slightly modified version of this example here [...]

Unfortunately, this is not accessible to me (I'm not sure if this is your drive, or if the transition to DeepNote changed things). Can you post a link directly to the course notes? e.g. http://underactuated.csail.mit.edu/acrobot.html#cart_pole

As I understand this is just a building block for the "prototyping" and designing a suitable controller, is this correct?, is SymbolicVectorSystem still needed if there is a real robot?.

The choice is ultimately up to you and your application. SymbolicVectorSystem may express what you need initially, but your needs may extend beyond what it can (easily) express.

Now that I am happy with the controller, I would like to deploy this controller to a ROS2 robot. [...]

There are a few examples of this using the Systems Framework, some in Drake proper, others in downstream projects, but understandably it may be hard to locate them.

Some examples in Drake itself, though using LCM, not ROS2, though:

Some examples in downstream projects; also primarily using LCM:


As a small side note, some of us at TRI, w/ help from OpenRobotics, are (slowly) developing some ROS2 interfaces for Drake; you can see some of the progress here: https://github.com/RobotLocomotion/drake-ros/tree/develop
Some caveats:

  • This is very experimental and has not yet been vetted (to any extent) on our side.
  • Our primary customer is ourselves in this case, so it may not fit your use case well at this point in time.

Upvotes: 2

Related Questions