Unverified Commit 4e727991 authored by Mayank Mittal's avatar Mayank Mittal Committed by GitHub

Adds command generator to manage different command types (#68)

# Description

This PR introduces the concept of command generators that can be used
for goal-conditioned environments. The idea is that these classes can be
used for task specification and the same environment can be configured
for different task logics (position-based locomotion vs velocity based
control).

Currently, the included command generators are specific to locomotion
(SE(2) control). They have their own visualization schemes (arrows,
boxes etc.) that can be useful for debugging.

## Type of change

- New feature (non-breaking change which adds functionality)
- This change requires a documentation update

## Checklist

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./orbit.sh --format`
- [x] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [x] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
parent f4bb9875
......@@ -18,6 +18,7 @@ omni.isaac.orbit extension
orbit.robots
orbit.sensors
orbit.terrains
orbit.command_generators
orbit.utils
orbit.utils.assets
orbit.utils.kit
......
omni.isaac.orbit.command_generators
====================================
.. automodule:: omni.isaac.orbit.command_generators
:members:
:show-inheritance:
[package]
# Note: Semantic Versioning is used: https://semver.org/
version = "0.6.1"
version = "0.6.2"
# Description
title = "ORBIT framework for Robot Learning"
......
Changelog
---------
0.6.2 (2023-07-21)
~~~~~~~~~~~~~~~~~~
Added
^^^^^
* Added the :mod:`omni.isaac.orbit.command_generators` to generate different commands based on the desired task.
It allows the user to generate commands for different tasks in the same environment without having to write
custom code for each task.
0.6.1 (2023-07-16)
~~~~~~~~~~~~~~~~~~
......
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
This submodule provides command generators for goal-conditioned tasks.
The command generators are used to generate commands for the agent to execute. The command generators act
as utility classes to make it convenient to switch between different command generation strategies within
the same environment. For instance, in an environment consisting of a quadrupedal robot, the command to it
could be a velocity command or position command. By keeping the command generation logic separate from the
environment, it is easy to switch between different command generation strategies.
The command generators are implemented as classes that inherit from the :class:`CommandGeneratorBase` class.
Each command generator class should also have a corresponding configuration class that inherits from the
:class:`CommandGeneratorBaseCfg` class.
"""
from .command_generator_base import CommandGeneratorBase
from .command_generator_cfg import (
CommandGeneratorBaseCfg,
NormalVelocityCommandGeneratorCfg,
TerrainBasedPositionCommandGeneratorCfg,
UniformVelocityCommandGeneratorCfg,
)
from .position_command_generator import TerrainBasedPositionCommandGenerator
from .velocity_command_generator import NormalVelocityCommandGenerator, UniformVelocityCommandGenerator
__all__ = [
"CommandGeneratorBase",
"CommandGeneratorBaseCfg",
# velocity command generators
"UniformVelocityCommandGenerator",
"UniformVelocityCommandGeneratorCfg",
"NormalVelocityCommandGenerator",
"NormalVelocityCommandGeneratorCfg",
# position command generators
"TerrainBasedPositionCommandGenerator",
"TerrainBasedPositionCommandGeneratorCfg",
]
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Base class for command generators.
This class defines an interface for command generators that can be used for goal-conditioned
tasks. Each command generator class should inherit from this class and implement the abstract
methods.
"""
import torch
from abc import ABC, abstractmethod
from typing import Dict, Optional, Sequence
from .command_generator_cfg import CommandGeneratorBaseCfg
class CommandGeneratorBase(ABC):
"""The base class for implementing a command generator.
A command generator is used to generate commands for goal-conditioned tasks. For example,
in the case of a goal-conditioned navigation task, the command generator can be used to
generate a target position for the robot to navigate to.
The command generator implements a resampling mechanism that allows the command to be
resampled at a fixed frequency. The resampling frequency can be specified in the
configuration object. Additionally, it is possible to assign a visualization function
to the command generator that can be used to visualize the command in the simulator.
"""
def __init__(self, cfg: CommandGeneratorBaseCfg, env: object):
"""Initialize the command generator class.
Args:
cfg (CommandGeneratorBaseCfg): The configuration parameters for the command generator.
env (object): The environment object.
"""
# store the inputs
self.cfg = cfg
# extract the environment parameters
self.dt = env.dt
self.num_envs = env.num_envs
self.device = env.device
# create buffers to store the command
# -- metrics that can be used for logging
self.metrics = dict()
# -- time left before resampling
self.time_left = torch.zeros(self.num_envs, device=self.device)
# -- counter for the number of times the command has been resampled within the current episode
self.command_counter = torch.zeros(self.num_envs, device=self.device, dtype=torch.long)
"""
Properties
"""
@property
@abstractmethod
def command(self) -> torch.Tensor:
"""The command tensor. Shape is (num_envs, command_dim)."""
raise NotImplementedError
"""
Operations.
"""
def reset(self, env_ids: Optional[Sequence[int]] = None):
"""Reset the command generator.
This function resets the command counter and resamples the command. It should be called
at the beginning of each episode.
Args:
env_ids (Optional[Sequence[int]], optional): The list of environment IDs to reset. Defaults to None.
"""
# resolve the environment IDs
if env_ids is None:
env_ids = ...
# set the command counter to zero
self.command_counter[env_ids] = 0
# resample the command
self._resample(env_ids)
def compute(self):
"""Compute the command."""
# update the metrics based on current state
self._update_metrics()
# reduce the time left before resampling
self.time_left -= self.dt
# resample the command if necessary
resample_env_ids = (self.time_left <= 0.0).nonzero().flatten()
if len(resample_env_ids) > 0:
self._resample(resample_env_ids)
# update the command
self._update_command()
def log_info(self, env_ids: Sequence[int]) -> Dict[str, float]:
"""Log information such as metrics.
Args:
env_ids (Sequence[int]): The list of environment IDs to log the information for.
Returns:
Dict[str, float]: A dictionary containing the information to log under the "Metrics/{name}" key.
"""
extras = {}
for metric_name, metric_value in self.metrics.items():
# compute the mean metric value
extras[f"Metrics/{metric_name}"] = torch.mean(metric_value[env_ids]).item()
# reset the metric value
metric_value[env_ids] = 0.0
return extras
def debug_vis(self):
"""Visualize the command in the simulator.
This is an optional function that can be used to visualize the command in the simulator.
"""
if self.cfg.debug_vis:
pass
"""
Helper functions.
"""
def _resample(self, env_ids: Sequence[int]):
"""Resample the command.
This function resamples the command and time for which the command is applied for the
specified environment indices.
Args:
env_ids (Sequence[int]): The list of environment IDs to resample.
"""
# resample the time left before resampling
self.time_left[env_ids] = self.time_left[env_ids].uniform_(*self.cfg.resampling_time_range)
# increment the command counter
self.command_counter[env_ids] += 1
# resample the command
self._resample_command(env_ids)
"""
Implementation specific functions.
"""
@abstractmethod
def _resample_command(self, env_ids: Sequence[int]):
"""Resample the command for the specified environments."""
raise NotImplementedError
@abstractmethod
def _update_command(self):
"""Update the command based on the current state."""
raise NotImplementedError
@abstractmethod
def _update_metrics(self):
"""Update the metrics based on the current state."""
raise NotImplementedError
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from dataclasses import MISSING
from typing import ClassVar, Tuple
from omni.isaac.orbit.utils import configclass
"""
Base command generator.
"""
@configclass
class CommandGeneratorBaseCfg:
"""Configuration for the base command generator."""
class_name: ClassVar[str] = MISSING
"""Name of the command generator class."""
resampling_time_range: Tuple[float, float] = MISSING
"""Time before commands are changed [s]."""
debug_vis: bool = False
"""Whether to visualize debug information. Defaults to False."""
"""
Locomotion-specific command generators.
"""
@configclass
class UniformVelocityCommandGeneratorCfg(CommandGeneratorBaseCfg):
"""Configuration for the uniform velocity command generator."""
class_name = "UniformVelocityCommandGenerator"
robot_attr: str = MISSING
"""Name of the robot attribute from the environment."""
heading_command: bool = MISSING
"""Whether to use heading command or angular velocity command.
If True, the angular velocity command is computed from the heading error, where the
target heading is sampled uniformly from provided range. Otherwise, the angular velocity
command is sampled uniformly from provided range.
"""
rel_standing_envs: float = MISSING
"""Probability threshold for environments where the robots that are standing still."""
rel_heading_envs: float = MISSING
"""Probability threshold for environments where the robots follow the heading-based angular velocity command
(the others follow the sampled angular velocity command)."""
@configclass
class Ranges:
"""Uniform distribution ranges for the velocity commands."""
lin_vel_x: Tuple[float, float] = MISSING # min max [m/s]
lin_vel_y: Tuple[float, float] = MISSING # min max [m/s]
ang_vel_z: Tuple[float, float] = MISSING # min max [rad/s]
heading: Tuple[float, float] = MISSING # [rad]
ranges: Ranges = MISSING
"""Distribution ranges for the velocity commands."""
@configclass
class NormalVelocityCommandGeneratorCfg(UniformVelocityCommandGeneratorCfg):
"""Configuration for the normal velocity command generator."""
class_name = "NormalVelocityCommandGenerator"
heading_command: bool = False # --> we don't use heading command for normal velocity command.
@configclass
class Ranges:
"""Normal distribution ranges for the velocity commands."""
mean_vel: Tuple[float, float, float] = MISSING
"""Mean velocity for the normal distribution.
The tuple contains the mean linear-x, linear-y, and angular-z velocity.
"""
std_vel: Tuple[float, float, float] = MISSING
"""Standard deviation for the normal distribution.
The tuple contains the standard deviation linear-x, linear-y, and angular-z velocity.
"""
zero_prob: Tuple[float, float, float] = MISSING
"""Probability of zero velocity for the normal distribution.
The tuple contains the probability of zero linear-x, linear-y, and angular-z velocity.
"""
ranges: Ranges = MISSING
"""Distribution ranges for the velocity commands."""
@configclass
class TerrainBasedPositionCommandGeneratorCfg(CommandGeneratorBaseCfg):
"""Configuration for the terrain-based position command generator."""
class_name = "TerrainBasedPositionCommandGenerator"
robot_attr: str = MISSING
"""Name of the robot attribute from the environment."""
rel_standing_envs: float = MISSING
"""Probability threshold for environments where the robots that are standing still."""
simple_heading: bool = MISSING
"""Whether to use simple heading or not.
If True, the heading is in the direction of the target position.
"""
@configclass
class Ranges:
"""Uniform distribution ranges for the velocity commands."""
heading: Tuple[float, float] = MISSING
"""Heading range for the position commands (in rad).
Used only if :attr:`simple_heading` is False.
"""
ranges: Ranges = MISSING
"""Distribution ranges for the position commands."""
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Sub-module containing command generators for the position-based locomotion task."""
import torch
from typing import Sequence
from omni.isaac.orbit.markers import VisualizationMarkers
from omni.isaac.orbit.markers.config import CUBOID_MARKER_CFG
from omni.isaac.orbit.robots.robot_base import RobotBase
from omni.isaac.orbit.terrains import TerrainImporter
from omni.isaac.orbit.utils.math import quat_rotate_inverse, wrap_to_pi, yaw_quat
from .command_generator_base import CommandGeneratorBase
from .command_generator_cfg import TerrainBasedPositionCommandGeneratorCfg
class TerrainBasedPositionCommandGenerator(CommandGeneratorBase):
"""Command generator that generates position commands based on the terrain.
The position commands are sampled from the terrain mesh and the heading commands are either set
to point towards the target or are sampled uniformly.
"""
cfg: TerrainBasedPositionCommandGeneratorCfg
"""Configuration for the command generator."""
def __init__(self, cfg: TerrainBasedPositionCommandGeneratorCfg, env: object):
"""Initialize the command generator class.
Args:
cfg (TerrainBasedPositionCommandGeneratorCfg): The configuration parameters for the command generator.
env (object): The environment object.
"""
super().__init__(cfg, env)
# -- robot
# TODO: Should we make this configurable like this?
self.robot: RobotBase = getattr(env, cfg.robot_attr)
# -- terrain
self.terrain: TerrainImporter = env.terrain
# -- commands: (x, y, z, heading)
self.pos_command_w = torch.zeros(self.num_envs, 3, device=self.device)
self.heading_command_w = torch.zeros(self.num_envs, device=self.device)
self.pos_command_b = torch.zeros_like(self.pos_command_w)
self.heading_command_b = torch.zeros_like(self.heading_command_w)
# -- metrics
self.metrics["error_pos"] = torch.zeros(self.num_envs, device=self.device)
self.metrics["error_heading"] = torch.zeros(self.num_envs, device=self.device)
# -- debug vis
self._box_goal_marker = None
def __str__(self) -> str:
msg = "TerrainBasedPositionCommandGenerator:\n"
msg += f"\tCommand dimension: {tuple(self.command.shape[1:])}\n"
msg += f"\tResampling time range: {self.cfg.resampling_time_range}\n"
msg += f"\tStanding probability: {self.cfg.rel_standing_envs}"
return msg
"""
Properties
"""
@property
def command(self) -> torch.Tensor:
"""The desired base position in base frame. Shape is (num_envs, 3)."""
return self.pos_command_b
"""
Operations.
"""
def debug_vis(self):
if self.cfg.debug_vis:
# create the box marker if necessary
if self._box_goal_marker is None:
marker_cfg = CUBOID_MARKER_CFG
marker_cfg.markers["cuboid"].color = (1.0, 0.0, 0.0)
marker_cfg.markers["cuboid"].scale = (0.1, 0.1, 0.1)
self._box_goal_marker = VisualizationMarkers("/Visuals/base_position_goal", marker_cfg)
# update the box marker
self._box_goal_marker.visualize(self.pos_command_w)
"""
Implementation specific functions.
"""
def _resample_command(self, env_ids: Sequence[int]):
# sample new position targets from the terrain
# TODO: need to add that here directly
self.pos_command_w[env_ids] = self.terrain.sample_new_targets(env_ids)
# offset the position command by the current root position
self.pos_command_w[env_ids, 2] += self.robot.get_default_root_states(clone=False)[env_ids, 2]
if self.cfg.simple_heading:
# set heading command to point towards target
target_vec = self.pos_command_w[env_ids] - self.robot.data.root_pos_w[env_ids]
target_direction = torch.atan2(target_vec[:, 1], target_vec[:, 0])
flipped_heading = wrap_to_pi(target_direction + torch.pi)
self.heading_command_w[env_ids] = torch.where(
wrap_to_pi(target_direction - self.robot.data.heading_w[env_ids]).abs()
< wrap_to_pi(flipped_heading - self.robot.data.heading_w[env_ids]).abs(),
target_direction,
flipped_heading,
)
else:
# random heading command
r = torch.empty(len(env_ids), device=self.device)
self.heading_command_w[env_ids] = r.uniform_(*self.cfg.ranges.heading)
def _update_command(self):
"""Retargets the position command to the current root position and heading."""
target_vec = self.pos_command_w - self.robot.root_pos_w[:, :3]
self.pos_command_b[:] = quat_rotate_inverse(yaw_quat(self.robot.root_quat_w), target_vec)
self.heading_command_b[:] = wrap_to_pi(self.heading_command_w - self.robot.heading_w)
def _update_metrics(self):
# logs data
self.metrics["error_pos"] = torch.norm(self.pos_command_w - self.robot.root_pos_w[:, :3], dim=1)
self.metrics["error_heading"] = torch.abs(wrap_to_pi(self.heading_command_w - self.robot.heading_w))
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Sub-module containing command generators for the velocity-based locomotion task."""
import torch
from typing import Sequence, Tuple
from omni.isaac.orbit.markers import VisualizationMarkers
from omni.isaac.orbit.markers.config import ARROW_X_MARKER_CFG
from omni.isaac.orbit.robots.robot_base import RobotBase
from omni.isaac.orbit.utils.math import quat_apply, quat_from_euler_xyz, wrap_to_pi
from .command_generator_base import CommandGeneratorBase
from .command_generator_cfg import NormalVelocityCommandGeneratorCfg, UniformVelocityCommandGeneratorCfg
class UniformVelocityCommandGenerator(CommandGeneratorBase):
r"""Command generator that generates a velocity command in SE(2) from uniform distribution.
The command comprises of a linear velocity in x and y direction and an angular velocity around
the z-axis. It is given in the robot's base frame.
If the :attr:`cfg.heading_command` flag is set to True, the angular velocity is computed from the heading
error similar to doing a proportional control on the heading error. The target heading is sampled uniformly
from the provided range. Otherwise, the angular velocity is sampled uniformly from the provided range.
Mathematically, the angular velocity is computed as follows from the heading command:
.. math::
\omega_z = \frac{1}{2} \text{wrap_to_pi}(\theta_{\text{target}} - \theta_{\text{current}})
"""
cfg: UniformVelocityCommandGeneratorCfg
"""The configuration of the command generator."""
def __init__(self, cfg: UniformVelocityCommandGeneratorCfg, env: object):
"""Initialize the command generator.
Args:
cfg (UniformVelocityCommandGeneratorCfg): The configuration of the command generator.
env (object): The environment.
"""
super().__init__(cfg, env)
# -- robot
# TODO: Should we make this configurable like this?
self.robot: RobotBase = getattr(env, cfg.robot_attr)
# -- command: x vel, y vel, yaw vel, heading
self.vel_command_b = torch.zeros(self.num_envs, 3, device=self.device)
self.heading_target = torch.zeros(self.num_envs, device=self.device)
self.is_heading_env = torch.zeros(self.num_envs, dtype=torch.bool, device=self.device)
self.is_standing_env = torch.zeros_like(self.is_heading_env)
# -- metrics
self.metrics["error_vel_xy"] = torch.zeros(self.num_envs, device=self.device)
self.metrics["error_vel_yaw"] = torch.zeros(self.num_envs, device=self.device)
# -- debug vis
self._base_vel_goal_markers = None
self._base_vel_markers = None
def __str__(self) -> str:
"""Return a string representation of the command generator."""
msg = "UniformVelocityCommandGenerator:\n"
msg += f"\tCommand dimension: {tuple(self.command.shape[1:])}\n"
msg += f"\tResampling time range: {self.cfg.resampling_time_range}\n"
msg += f"\tHeading command: {self.cfg.heading_command}\n"
if self.cfg.heading_command:
msg += f"\tHeading probability: {self.cfg.rel_heading_envs}\n"
msg += f"\tStanding probability: {self.cfg.rel_standing_envs}"
return msg
"""
Properties
"""
@property
def command(self) -> torch.Tensor:
"""The desired base velocity command in the base frame. Shape is (num_envs, 3)."""
return self.vel_command_b
"""
Operations.
"""
def debug_vis(self):
if self.cfg.debug_vis:
# create markers if necessary
if self._base_vel_goal_markers is None:
marker_cfg = ARROW_X_MARKER_CFG
marker_cfg.markers["arrow"].color = (1.0, 0.0, 0.0)
self._base_vel_goal_markers = VisualizationMarkers("/Visuals/base_velocity_goal", marker_cfg)
if self._base_vel_markers is None:
marker_cfg = ARROW_X_MARKER_CFG
marker_cfg.markers["arrow"].color = (0.0, 0.0, 1.0)
self._base_vel_markers = VisualizationMarkers("/Visuals/base_velocity_current", marker_cfg)
# get marker location
# -- base state
base_pos_w = self.robot.data.root_pos_w.clone()
base_pos_w[:, 2] += 0.5
# -- command
vel_des_arrow_scale, vel_des_arrow_quat = self._resolve_xy_velocity_to_arrow(self.command[:, :2])
vel_arrow_scale, vel_arrow_quat = self._resolve_xy_velocity_to_arrow(self.robot.data.root_lin_vel_w[:, :2])
# -- goal
self._base_vel_goal_markers.visualize(base_pos_w, vel_des_arrow_quat, vel_des_arrow_scale)
# -- base velocity
self._base_vel_markers.visualize(base_pos_w, vel_arrow_quat, vel_arrow_scale)
"""
Implementation specific functions.
"""
def _resample_command(self, env_ids: Sequence[int]):
# sample velocity commands
r = torch.empty(len(env_ids), device=self.device)
# -- linear velocity - x direction
self.vel_command_b[env_ids, 0] = r.uniform_(*self.cfg.ranges.lin_vel_x)
# -- linear velocity - y direction
self.vel_command_b[env_ids, 1] = r.uniform_(*self.cfg.ranges.lin_vel_y)
# -- ang vel yaw - rotation around z
self.vel_command_b[env_ids, 2] = r.uniform_(*self.cfg.ranges.ang_vel_z)
# heading target
if self.cfg.heading_command:
self.heading_target[env_ids] = r.uniform_(*self.cfg.ranges.heading)
# update heading envs
self.is_heading_env[env_ids] = r.uniform_(0.0, 1.0) <= self.cfg.rel_heading_envs
# update standing envs
self.is_standing_env[env_ids] = r.uniform_(0.0, 1.0) <= self.cfg.rel_standing_envs
def _update_command(self):
"""Post-processes the velocity command.
This function sets velocity command to zero for standing environments and computes angular
velocity from heading direction if the heading_command flag is set.
"""
# Compute angular velocity from heading direction
if self.cfg.heading_command:
# resolve indices of heading envs
heading_env_ids = self.is_heading_env.nonzero(as_tuple=False).flatten()
# obtain heading direction
forward = quat_apply(
self.robot.data.root_quat_w[heading_env_ids, :], self.robot._FORWARD_VEC_B[heading_env_ids]
)
heading = torch.atan2(forward[:, 1], forward[:, 0])
# compute angular velocity
self.vel_command_b[heading_env_ids, 2] = torch.clip(
0.5 * wrap_to_pi(self.heading_target[heading_env_ids] - heading),
min=self.cfg.ranges.ang_vel_z[0],
max=self.cfg.ranges.ang_vel_z[1],
)
# Enforce standing (i.e., zero velocity command) for standing envs
# TODO: check if conversion is needed
standing_env_ids = self.is_standing_env.nonzero(as_tuple=False).flatten()
self.vel_command_b[standing_env_ids, :] = 0.0
def _update_metrics(self):
# time for which the command was executed
max_command_time = self.cfg.resampling_time_range[1]
# logs data
self.metrics["error_vel_xy"] += (
torch.norm(self.vel_command_b[:, :2] - self.robot.data.root_lin_vel_b[:, :2]) / max_command_time
)
self.metrics["error_vel_yaw"] += (
torch.abs(self.vel_command_b[:, 2] - self.robot.data.root_ang_vel_b[:, 2]) / max_command_time
)
"""
Internal helpers.
"""
def _resolve_xy_velocity_to_arrow(self, xy_velocity: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
"""Converts the XY base velocity command to arrow direction rotation."""
# obtain default scale of the marker
default_scale = self._base_vel_goal_markers.cfg.markers["arrow"].scale
# arrow-scale
arrow_scale = torch.tensor(default_scale, device=self.device).repeat(xy_velocity.shape[0], 1)
arrow_scale[:, 0] *= torch.linalg.norm(xy_velocity, dim=1)
# arrow-direction
heading_angle = torch.atan2(xy_velocity[:, 1], xy_velocity[:, 0])
zeros = torch.zeros_like(heading_angle)
arrow_quat = quat_from_euler_xyz(zeros, zeros, heading_angle)
return arrow_scale, arrow_quat
class NormalVelocityCommandGenerator(UniformVelocityCommandGenerator):
"""Command generator that generates a velocity command in SE(2) from a normal distribution.
The command comprises of a linear velocity in x and y direction and an angular velocity around
the z-axis. It is given in the robot's base frame.
The command is sampled from a normal distribution with mean and standard deviation specified in
the configuration. With equal probability, the sign of the individual components is flipped.
"""
cfg: NormalVelocityCommandGeneratorCfg
"""The command generator configuration."""
def __init__(self, cfg: NormalVelocityCommandGeneratorCfg, env: object):
"""Initializes the command generator.
Args:
cfg (NormalVelocityCommandGeneratorCfg): The command generator configuration.
env: The environment.
"""
super().__init__(self, cfg, env)
# create buffers for zero commands envs
self.is_zero_vel_x_env = torch.zeros(self.num_envs, dtype=torch.bool, device=self.device)
self.is_zero_vel_y_env = torch.zeros_like(self.is_zero_vel_x_env)
self.is_zero_vel_yaw_env = torch.zeros_like(self.is_zero_vel_x_env)
def __str__(self) -> str:
"""Return a string representation of the command generator."""
msg = "NormalVelocityCommandGenerator:\n"
msg += f"\tCommand dimension: {tuple(self.command.shape[1:])}\n"
msg += f"\tResampling time range: {self.cfg.resampling_time_range}\n"
msg += f"\tStanding probability: {self.cfg.rel_standing_envs}"
return msg
def _resample_command(self, env_ids):
# sample velocity commands
r = torch.empty(len(env_ids), device=self.device)
# -- linear velocity - x direction
self.vel_command_b[env_ids, 0] = r.normal_(mean=self.cfg.ranges.mean_vel[0], std=self.cfg.ranges.std_vel[0])
self.vel_command_b[env_ids, 0] *= torch.where(r.uniform_(0.0, 1.0) <= 0.5, 1.0, -1.0)
# -- linear velocity - y direction
self.vel_command_b[env_ids, 1] = r.normal_(mean=self.cfg.ranges.mean_vel[1], std=self.cfg.ranges.std_vel[1])
self.vel_command_b[env_ids, 1] *= torch.where(r.uniform_(0.0, 1.0) <= 0.5, 1.0, -1.0)
# -- angular velocity - yaw direction
self.vel_command_b[env_ids, 2] = r.normal_(mean=self.cfg.ranges.mean_vel[2], std=self.cfg.ranges.std_vel[2])
self.vel_command_b[env_ids, 2] *= torch.where(r.uniform_(0.0, 1.0) <= 0.5, 1.0, -1.0)
# update element wise zero velocity command
# TODO what is zero prob ?
self.is_zero_vel_x_env[env_ids] = r.uniform_(0.0, 1.0) <= self.cfg.ranges.zero_prob[0]
self.is_zero_vel_y_env[env_ids] = r.uniform_(0.0, 1.0) <= self.cfg.ranges.zero_prob[1]
self.is_zero_vel_yaw_env[env_ids] = r.uniform_(0.0, 1.0) <= self.cfg.ranges.zero_prob[2]
# update standing envs
self.is_standing_env[env_ids] = r.uniform_(0.0, 1.0) <= self.cfg.rel_standing_envs
def _update_command(self):
"""Sets velocity command to zero for standing envs."""
# Enforce standing (i.e., zero velocity command) for standing envs
standing_env_ids = self.is_standing_env.nonzero(as_tuple=False).flatten() # TODO check if conversion is needed
self.vel_command_b[standing_env_ids, :] = 0.0
# Enforce zero velocity for individual elements
# TODO: check if conversion is needed
zero_vel_x_env_ids = self.is_zero_vel_x_env.nonzero(as_tuple=False).flatten()
zero_vel_y_env_ids = self.is_zero_vel_y_env.nonzero(as_tuple=False).flatten()
zero_vel_yaw_env_ids = self.is_zero_vel_yaw_env.nonzero(as_tuple=False).flatten()
self.vel_command_b[zero_vel_x_env_ids, 0] = 0.0
self.vel_command_b[zero_vel_y_env_ids, 1] = 0.0
self.vel_command_b[zero_vel_yaw_env_ids, 2] = 0.0
......@@ -62,3 +62,23 @@ CONTACT_SENSOR_MARKER_CFG = VisualizationMarkersCfg(
},
)
"""Configuration for the contact sensor marker."""
ARROW_X_MARKER_CFG = VisualizationMarkersCfg(
markers={
"arrow": VisualizationMarkersCfg.FileMarkerCfg(
usd_path=f"{ISAAC_NUCLEUS_DIR}/Props/UIElements/arrow_x.usd",
scale=[1.0, 0.1, 0.1],
)
}
)
"""Configuration for the arrow marker (along x-direction)."""
CUBOID_MARKER_CFG = VisualizationMarkersCfg(
markers={
"cuboid": VisualizationMarkersCfg.MarkerCfg(
prim_type="Cube",
)
}
)
"""Configuration for the cuboid marker."""
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment