Commit 4686c3da authored by Mayank Mittal's avatar Mayank Mittal

Add scene interface and concept of Base and RL environments (#124)

# Description

This MR adds the `InteractiveScene` interface, as the central unit that
contains all simulation elements (or as we call "scene entitites") under
the same umbrella. By parsing the configuration object, it helps spawn
different prims and create relevant handles for them to read/write their
data. Based on the above, it modifies how the managers work. The terms
expect to extract the entities from the scene for different computations
(randomizations, rewards, actions, observations, etc.).

The scene and managers together define the `BaseEnv` and `RLEnv` where
the difference between the two is that the latter includes additional
managers that are RL specific (rewards, terminations).

Besides these, the MR includes the following improvements:
* Makes debug visualization of commands consistent by adding them into
callbacks
* Handles the simulation pause/play in the `SimulationContext` itself to
simplify standalone scripts
* Cleanups and documentation fixes to all the MDP terms

## Type of change

- New feature (non-breaking change which adds functionality)
- Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- This change requires a documentation update

## Checklist

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./orbit.sh --format`
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [x] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
parent 3f2beaf5
[package]
# Note: Semantic Versioning is used: https://semver.org/
version = "0.9.3"
version = "0.9.4"
# Description
title = "ORBIT framework for Robot Learning"
......
Changelog
---------
0.9.4 (2023-08-29)
~~~~~~~~~~~~~~~~~~
Added
^^^^^
* Added :class:`omni.isaac.orbit.scene.InteractiveScene`, as the central scene unit that contains all entities
that are part of the simulation. These include the terrain, sensors, articulations, rigid objects etc.
The scene groups the common operations of these entities and allows to access them via their unique names.
* Added :mod:`omni.isaac.orbit.envs` module that contains environment definitions that encapsulate the different
general (scene, action manager, observation manager) and RL-specific (reward and termination manager) managers.
* Added :class:`omni.isaac.orbit.managers.SceneEntityCfg` to handle which scene elements are required by the
manager's terms. This allows the manager to parse useful information from the scene elements, such as the
joint and body indices, and pass them to the term.
* Added :class:`omni.isaac.orbit.sim.SimulationContext.RenderMode` to handle different rendering modes based on
what the user wants to update (viewport, cameras, or UI elements).
Fixed
^^^^^
* Fixed the :class:`omni.isaac.orbit.command_generators.CommandGeneratorBase` to register a debug visualization
callback similar to how sensors and robots handle visualization.
0.9.3 (2023-08-23)
~~~~~~~~~~~~~~~~~~
......@@ -29,7 +53,7 @@ Changed
0.9.2 (2023-08-22)
~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~
Added
^^^^^
......
......@@ -102,8 +102,8 @@ class RigidObject(AssetBase):
if env_ids is None:
env_ids = ...
# reset external wrench
self._external_force_b[env_ids].zero_()
self._external_torque_b[env_ids].zero_()
self._external_force_b[env_ids] = 0.0
self._external_torque_b[env_ids] = 0.0
def write_data_to_sim(self):
"""Write external wrench to the simulation.
......@@ -238,7 +238,7 @@ class RigidObject(AssetBase):
# -- env_ids
if env_ids is None:
env_ids = self._ALL_INDICES
elif not isinstance(body_ids, torch.Tensor):
elif not isinstance(env_ids, torch.Tensor):
env_ids = torch.tensor(env_ids, dtype=torch.long, device=self.device)
# -- body_ids
if body_ids is None:
......
......@@ -10,11 +10,18 @@ tasks. Each command generator class should inherit from this class and implement
methods.
"""
from __future__ import annotations
import torch
from abc import ABC, abstractmethod
from typing import Dict, Optional, Sequence
from typing import TYPE_CHECKING, Sequence
import omni.kit.app
from .command_generator_cfg import CommandGeneratorBaseCfg
if TYPE_CHECKING:
from omni.isaac.orbit.envs import BaseEnv
from .command_generator_cfg import CommandGeneratorBaseCfg
class CommandGeneratorBase(ABC):
......@@ -30,19 +37,16 @@ class CommandGeneratorBase(ABC):
to the command generator that can be used to visualize the command in the simulator.
"""
def __init__(self, cfg: CommandGeneratorBaseCfg, env: object):
def __init__(self, cfg: CommandGeneratorBaseCfg, env: BaseEnv):
"""Initialize the command generator class.
Args:
cfg (CommandGeneratorBaseCfg): The configuration parameters for the command generator.
env (object): The environment object.
env (BaseEnv): The environment object.
"""
# store the inputs
self.cfg = cfg
# extract the environment parameters
self.dt = env.dt
self.num_envs = env.num_envs
self.device = env.device
self._env = env
# create buffers to store the command
# -- metrics that can be used for logging
self.metrics = dict()
......@@ -51,10 +55,34 @@ class CommandGeneratorBase(ABC):
# -- counter for the number of times the command has been resampled within the current episode
self.command_counter = torch.zeros(self.num_envs, device=self.device, dtype=torch.long)
# add callback for debug visualization
if self.cfg.debug_vis:
app_interface = omni.kit.app.get_app_interface()
self._debug_visualization_handle = app_interface.get_post_update_event_stream().create_subscription_to_pop(
self._debug_vis_callback
)
else:
self._debug_visualization_handle = None
def __del__(self):
"""Unsubscribe from the callbacks."""
if self._debug_visualization_handle is not None:
self._debug_visualization_handle.unsubscribe()
"""
Properties
"""
@property
def num_envs(self) -> int:
"""Number of environments."""
return self._env.num_envs
@property
def device(self) -> str:
"""Device on which to perform computations."""
return self._env.device
@property
@abstractmethod
def command(self) -> torch.Tensor:
......@@ -65,14 +93,29 @@ class CommandGeneratorBase(ABC):
Operations.
"""
def reset(self, env_ids: Optional[Sequence[int]] = None):
"""Reset the command generator.
def set_debug_vis(self, debug_vis: bool):
"""Sets whether to visualize the command data.
Args:
debug_vis (bool): Whether to visualize the command data.
Raises:
RuntimeError: If the command debug visualization is not enabled.
"""
if not self.cfg.debug_vis:
raise RuntimeError("Debug visualization is not enabled for this sensor.")
def reset(self, env_ids: Sequence[int] | None = None) -> dict[str, float]:
"""Reset the command generator and log metrics.
This function resets the command counter and resamples the command. It should be called
at the beginning of each episode.
Args:
env_ids (Optional[Sequence[int]], optional): The list of environment IDs to reset. Defaults to None.
Returns:
Dict[str, float]: A dictionary containing the information to log under the "Metrics/{name}" key.
"""
# resolve the environment IDs
if env_ids is None:
......@@ -81,29 +124,7 @@ class CommandGeneratorBase(ABC):
self.command_counter[env_ids] = 0
# resample the command
self._resample(env_ids)
def compute(self):
"""Compute the command."""
# update the metrics based on current state
self._update_metrics()
# reduce the time left before resampling
self.time_left -= self.dt
# resample the command if necessary
resample_env_ids = (self.time_left <= 0.0).nonzero().flatten()
if len(resample_env_ids) > 0:
self._resample(resample_env_ids)
# update the command
self._update_command()
def log_info(self, env_ids: Sequence[int]) -> Dict[str, float]:
"""Log information such as metrics.
Args:
env_ids (Sequence[int]): The list of environment IDs to log the information for.
Returns:
Dict[str, float]: A dictionary containing the information to log under the "Metrics/{name}" key.
"""
# add logging metrics
extras = {}
for metric_name, metric_value in self.metrics.items():
# compute the mean metric value
......@@ -112,13 +133,22 @@ class CommandGeneratorBase(ABC):
metric_value[env_ids] = 0.0
return extras
def debug_vis(self):
"""Visualize the command in the simulator.
def compute(self, dt: float):
"""Compute the command.
This is an optional function that can be used to visualize the command in the simulator.
Args:
dt (float): The time step passed since the last call to compute.
"""
if self.cfg.debug_vis:
pass
# update the metrics based on current state
self._update_metrics()
# reduce the time left before resampling
self.time_left -= dt
# resample the command if necessary
resample_env_ids = (self.time_left <= 0.0).nonzero().flatten()
if len(resample_env_ids) > 0:
self._resample(resample_env_ids)
# update the command
self._update_command()
"""
Helper functions.
......@@ -140,6 +170,14 @@ class CommandGeneratorBase(ABC):
# resample the command
self._resample_command(env_ids)
"""
Simulation callbacks.
"""
def _debug_vis_callback(self, event):
"""Visualizes the sensor data."""
self._debug_vis_impl()
"""
Implementation specific functions.
"""
......@@ -158,3 +196,10 @@ class CommandGeneratorBase(ABC):
def _update_metrics(self):
"""Update the metrics based on the current state."""
raise NotImplementedError
def _debug_vis_impl(self):
"""Visualize the command in the simulator.
This is an optional function that can be used to visualize the command in the simulator.
"""
pass
......@@ -3,11 +3,16 @@
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
from dataclasses import MISSING
from typing import ClassVar, Tuple
from omni.isaac.orbit.utils import configclass
from .command_generator_base import CommandGeneratorBase
from .position_command_generator import TerrainBasedPositionCommandGenerator
from .velocity_command_generator import NormalVelocityCommandGenerator, UniformVelocityCommandGenerator
"""
Base command generator.
"""
......@@ -17,9 +22,9 @@ Base command generator.
class CommandGeneratorBaseCfg:
"""Configuration for the base command generator."""
class_name: ClassVar[str] = MISSING
"""Name of the command generator class."""
resampling_time_range: Tuple[float, float] = MISSING
class_name: type[CommandGeneratorBase] = MISSING
"""The command generator class to use."""
resampling_time_range: tuple[float, float] = MISSING
"""Time before commands are changed [s]."""
debug_vis: bool = False
"""Whether to visualize debug information. Defaults to False."""
......@@ -34,10 +39,10 @@ Locomotion-specific command generators.
class UniformVelocityCommandGeneratorCfg(CommandGeneratorBaseCfg):
"""Configuration for the uniform velocity command generator."""
class_name = "UniformVelocityCommandGenerator"
class_name = UniformVelocityCommandGenerator
robot_attr: str = MISSING
"""Name of the robot attribute from the environment."""
asset_name: str = MISSING
"""Name of the asset in the environment for which the commands are generated."""
heading_command: bool = MISSING
"""Whether to use heading command or angular velocity command.
......@@ -55,10 +60,10 @@ class UniformVelocityCommandGeneratorCfg(CommandGeneratorBaseCfg):
class Ranges:
"""Uniform distribution ranges for the velocity commands."""
lin_vel_x: Tuple[float, float] = MISSING # min max [m/s]
lin_vel_y: Tuple[float, float] = MISSING # min max [m/s]
ang_vel_z: Tuple[float, float] = MISSING # min max [rad/s]
heading: Tuple[float, float] = MISSING # [rad]
lin_vel_x: tuple[float, float] = MISSING # min max [m/s]
lin_vel_y: tuple[float, float] = MISSING # min max [m/s]
ang_vel_z: tuple[float, float] = MISSING # min max [rad/s]
heading: tuple[float, float] = MISSING # min max [rad]
ranges: Ranges = MISSING
"""Distribution ranges for the velocity commands."""
......@@ -68,24 +73,24 @@ class UniformVelocityCommandGeneratorCfg(CommandGeneratorBaseCfg):
class NormalVelocityCommandGeneratorCfg(UniformVelocityCommandGeneratorCfg):
"""Configuration for the normal velocity command generator."""
class_name = "NormalVelocityCommandGenerator"
class_name = NormalVelocityCommandGenerator
heading_command: bool = False # --> we don't use heading command for normal velocity command.
@configclass
class Ranges:
"""Normal distribution ranges for the velocity commands."""
mean_vel: Tuple[float, float, float] = MISSING
mean_vel: tuple[float, float, float] = MISSING
"""Mean velocity for the normal distribution.
The tuple contains the mean linear-x, linear-y, and angular-z velocity.
"""
std_vel: Tuple[float, float, float] = MISSING
std_vel: tuple[float, float, float] = MISSING
"""Standard deviation for the normal distribution.
The tuple contains the standard deviation linear-x, linear-y, and angular-z velocity.
"""
zero_prob: Tuple[float, float, float] = MISSING
zero_prob: tuple[float, float, float] = MISSING
"""Probability of zero velocity for the normal distribution.
The tuple contains the probability of zero linear-x, linear-y, and angular-z velocity.
......@@ -99,10 +104,10 @@ class NormalVelocityCommandGeneratorCfg(UniformVelocityCommandGeneratorCfg):
class TerrainBasedPositionCommandGeneratorCfg(CommandGeneratorBaseCfg):
"""Configuration for the terrain-based position command generator."""
class_name = "TerrainBasedPositionCommandGenerator"
class_name = TerrainBasedPositionCommandGenerator
robot_attr: str = MISSING
"""Name of the robot attribute from the environment."""
asset_name: str = MISSING
"""Name of the asset in the environment for which the commands are generated."""
rel_standing_envs: float = MISSING
"""Probability threshold for environments where the robots that are standing still."""
simple_heading: bool = MISSING
......@@ -115,7 +120,7 @@ class TerrainBasedPositionCommandGeneratorCfg(CommandGeneratorBaseCfg):
class Ranges:
"""Uniform distribution ranges for the velocity commands."""
heading: Tuple[float, float] = MISSING
heading: tuple[float, float] = MISSING
"""Heading range for the position commands (in rad).
Used only if :attr:`simple_heading` is False.
......
......@@ -5,17 +5,23 @@
"""Sub-module containing command generators for the position-based locomotion task."""
from __future__ import annotations
import torch
from typing import Sequence
from typing import TYPE_CHECKING, Sequence
from omni.isaac.orbit.assets import Articulation
from omni.isaac.orbit.markers import VisualizationMarkers
from omni.isaac.orbit.markers.config import CUBOID_MARKER_CFG
from omni.isaac.orbit.robots.robot_base import RobotBase
from omni.isaac.orbit.terrains import TerrainImporter
from omni.isaac.orbit.utils.math import quat_rotate_inverse, wrap_to_pi, yaw_quat
from .command_generator_base import CommandGeneratorBase
from .command_generator_cfg import TerrainBasedPositionCommandGeneratorCfg
if TYPE_CHECKING:
from omni.isaac.orbit.envs import BaseEnv
from .command_generator_cfg import TerrainBasedPositionCommandGeneratorCfg
class TerrainBasedPositionCommandGenerator(CommandGeneratorBase):
......@@ -28,19 +34,18 @@ class TerrainBasedPositionCommandGenerator(CommandGeneratorBase):
cfg: TerrainBasedPositionCommandGeneratorCfg
"""Configuration for the command generator."""
def __init__(self, cfg: TerrainBasedPositionCommandGeneratorCfg, env: object):
def __init__(self, cfg: TerrainBasedPositionCommandGeneratorCfg, env: BaseEnv):
"""Initialize the command generator class.
Args:
cfg (TerrainBasedPositionCommandGeneratorCfg): The configuration parameters for the command generator.
env (object): The environment object.
env (BaseEnv): The environment object.
"""
super().__init__(cfg, env)
# -- robot
# TODO: Should we make this configurable like this?
self.robot: RobotBase = getattr(env, cfg.robot_attr)
self.robot: Articulation = env.scene[cfg.asset_name]
# -- terrain
self.terrain: TerrainImporter = env.terrain
self.terrain: TerrainImporter = env.scene.terrain
# -- commands: (x, y, z, heading)
self.pos_command_w = torch.zeros(self.num_envs, 3, device=self.device)
self.heading_command_w = torch.zeros(self.num_envs, device=self.device)
......@@ -50,7 +55,7 @@ class TerrainBasedPositionCommandGenerator(CommandGeneratorBase):
self.metrics["error_pos"] = torch.zeros(self.num_envs, device=self.device)
self.metrics["error_heading"] = torch.zeros(self.num_envs, device=self.device)
# -- debug vis
self._box_goal_marker = None
self.box_goal_visualizer = None
def __str__(self) -> str:
msg = "TerrainBasedPositionCommandGenerator:\n"
......@@ -72,16 +77,10 @@ class TerrainBasedPositionCommandGenerator(CommandGeneratorBase):
Operations.
"""
def debug_vis(self):
if self.cfg.debug_vis:
# create the box marker if necessary
if self._box_goal_marker is None:
marker_cfg = CUBOID_MARKER_CFG
marker_cfg.markers["cuboid"].color = (1.0, 0.0, 0.0)
marker_cfg.markers["cuboid"].scale = (0.1, 0.1, 0.1)
self._box_goal_marker = VisualizationMarkers("/Visuals/Command/position_goal", marker_cfg)
# update the box marker
self._box_goal_marker.visualize(self.pos_command_w)
def set_debug_vis(self, debug_vis: bool):
super().set_debug_vis(debug_vis)
if self.box_goal_visualizer is not None:
self.box_goal_visualizer.set_visibility(debug_vis)
"""
Implementation specific functions.
......@@ -92,7 +91,7 @@ class TerrainBasedPositionCommandGenerator(CommandGeneratorBase):
# TODO: need to add that here directly
self.pos_command_w[env_ids] = self.terrain.sample_new_targets(env_ids)
# offset the position command by the current root position
self.pos_command_w[env_ids, 2] += self.robot.get_default_root_states(clone=False)[env_ids, 2]
self.pos_command_w[env_ids, 2] += self.robot.data.default_root_state_w[env_ids, 2]
if self.cfg.simple_heading:
# set heading command to point towards target
......@@ -111,12 +110,22 @@ class TerrainBasedPositionCommandGenerator(CommandGeneratorBase):
self.heading_command_w[env_ids] = r.uniform_(*self.cfg.ranges.heading)
def _update_command(self):
"""Retargets the position command to the current root position and heading."""
target_vec = self.pos_command_w - self.robot.root_pos_w[:, :3]
self.pos_command_b[:] = quat_rotate_inverse(yaw_quat(self.robot.root_quat_w), target_vec)
"""Re-target the position command to the current root position and heading."""
target_vec = self.pos_command_w - self.robot.data.root_pos_w[:, :3]
self.pos_command_b[:] = quat_rotate_inverse(yaw_quat(self.robot.data.root_quat_w), target_vec)
self.heading_command_b[:] = wrap_to_pi(self.heading_command_w - self.robot.heading_w)
def _update_metrics(self):
# logs data
self.metrics["error_pos"] = torch.norm(self.pos_command_w - self.robot.root_pos_w[:, :3], dim=1)
self.metrics["error_pos"] = torch.norm(self.pos_command_w - self.robot.data.root_pos_w[:, :3], dim=1)
self.metrics["error_heading"] = torch.abs(wrap_to_pi(self.heading_command_w - self.robot.heading_w))
def _debug_vis_impl(self):
# create the box marker if necessary
if self.box_goal_visualizer is None:
marker_cfg = CUBOID_MARKER_CFG
marker_cfg.markers["cuboid"].color = (1.0, 0.0, 0.0)
marker_cfg.markers["cuboid"].scale = (0.1, 0.1, 0.1)
self.box_goal_visualizer = VisualizationMarkers("/Visuals/Command/position_goal", marker_cfg)
# update the box marker
self.box_goal_visualizer.visualize(self.pos_command_w)
......@@ -5,16 +5,22 @@
"""Sub-module containing command generators for the velocity-based locomotion task."""
from __future__ import annotations
import torch
from typing import Sequence, Tuple
from typing import TYPE_CHECKING, Sequence
import omni.isaac.orbit.utils.math as math_utils
from omni.isaac.orbit.assets import Articulation
from omni.isaac.orbit.markers import VisualizationMarkers
from omni.isaac.orbit.markers.config import ARROW_X_MARKER_CFG
from omni.isaac.orbit.robots.robot_base import RobotBase
from omni.isaac.orbit.utils.math import quat_apply, quat_from_euler_xyz, wrap_to_pi
from .command_generator_base import CommandGeneratorBase
from .command_generator_cfg import NormalVelocityCommandGeneratorCfg, UniformVelocityCommandGeneratorCfg
if TYPE_CHECKING:
from omni.isaac.orbit.envs import BaseEnv
from .command_generator_cfg import NormalVelocityCommandGeneratorCfg, UniformVelocityCommandGeneratorCfg
class UniformVelocityCommandGenerator(CommandGeneratorBase):
......@@ -38,17 +44,18 @@ class UniformVelocityCommandGenerator(CommandGeneratorBase):
cfg: UniformVelocityCommandGeneratorCfg
"""The configuration of the command generator."""
def __init__(self, cfg: UniformVelocityCommandGeneratorCfg, env: object):
def __init__(self, cfg: UniformVelocityCommandGeneratorCfg, env: BaseEnv):
"""Initialize the command generator.
Args:
cfg (UniformVelocityCommandGeneratorCfg): The configuration of the command generator.
env (object): The environment.
env (BaseEnv): The environment.
"""
super().__init__(cfg, env)
# -- robot
# TODO: Should we make this configurable like this?
self.robot: RobotBase = getattr(env, cfg.robot_attr)
self.robot: Articulation = env.scene[cfg.asset_name]
# -- constants
self._FORWARD_VEC_B = torch.tensor((1.0, 0.0, 0.0), device=self.device).repeat(self.num_envs, 1)
# -- command: x vel, y vel, yaw vel, heading
self.vel_command_b = torch.zeros(self.num_envs, 3, device=self.device)
self.heading_target = torch.zeros(self.num_envs, device=self.device)
......@@ -58,8 +65,8 @@ class UniformVelocityCommandGenerator(CommandGeneratorBase):
self.metrics["error_vel_xy"] = torch.zeros(self.num_envs, device=self.device)
self.metrics["error_vel_yaw"] = torch.zeros(self.num_envs, device=self.device)
# -- debug vis
self._base_vel_goal_markers = None
self._base_vel_markers = None
self.base_vel_goal_visualizer = None
self.base_vel_visualizer = None
def __str__(self) -> str:
"""Return a string representation of the command generator."""
......@@ -85,28 +92,14 @@ class UniformVelocityCommandGenerator(CommandGeneratorBase):
Operations.
"""
def debug_vis(self):
if self.cfg.debug_vis:
# create markers if necessary
if self._base_vel_goal_markers is None:
marker_cfg = ARROW_X_MARKER_CFG
marker_cfg.markers["arrow"].color = (1.0, 0.0, 0.0)
self._base_vel_goal_markers = VisualizationMarkers("/Visuals/Command/velocity_goal", marker_cfg)
if self._base_vel_markers is None:
marker_cfg = ARROW_X_MARKER_CFG
marker_cfg.markers["arrow"].color = (0.0, 0.0, 1.0)
self._base_vel_markers = VisualizationMarkers("/Visuals/Command/velocity_current", marker_cfg)
# get marker location
# -- base state
base_pos_w = self.robot.data.root_pos_w.clone()
base_pos_w[:, 2] += 0.5
# -- command
vel_des_arrow_scale, vel_des_arrow_quat = self._resolve_xy_velocity_to_arrow(self.command[:, :2])
vel_arrow_scale, vel_arrow_quat = self._resolve_xy_velocity_to_arrow(self.robot.data.root_lin_vel_w[:, :2])
# -- goal
self._base_vel_goal_markers.visualize(base_pos_w, vel_des_arrow_quat, vel_des_arrow_scale)
# -- base velocity
self._base_vel_markers.visualize(base_pos_w, vel_arrow_quat, vel_arrow_scale)
def set_debug_vis(self, debug_vis: bool):
super().set_debug_vis(debug_vis)
# -- current
if self.base_vel_visualizer is not None:
self.base_vel_visualizer.set_visibility(debug_vis)
# -- goal
if self.base_vel_goal_visualizer is not None:
self.base_vel_goal_visualizer.set_visibility(debug_vis)
"""
Implementation specific functions.
......@@ -140,13 +133,13 @@ class UniformVelocityCommandGenerator(CommandGeneratorBase):
# resolve indices of heading envs
heading_env_ids = self.is_heading_env.nonzero(as_tuple=False).flatten()
# obtain heading direction
forward = quat_apply(
self.robot.data.root_quat_w[heading_env_ids, :], self.robot._FORWARD_VEC_B[heading_env_ids]
forward = math_utils.quat_apply(
self.robot.data.root_quat_w[heading_env_ids, :], self._FORWARD_VEC_B[heading_env_ids]
)
heading = torch.atan2(forward[:, 1], forward[:, 0])
# compute angular velocity
self.vel_command_b[heading_env_ids, 2] = torch.clip(
0.5 * wrap_to_pi(self.heading_target[heading_env_ids] - heading),
0.5 * math_utils.wrap_to_pi(self.heading_target[heading_env_ids] - heading),
min=self.cfg.ranges.ang_vel_z[0],
max=self.cfg.ranges.ang_vel_z[1],
)
......@@ -166,21 +159,50 @@ class UniformVelocityCommandGenerator(CommandGeneratorBase):
torch.abs(self.vel_command_b[:, 2] - self.robot.data.root_ang_vel_b[:, 2]) / max_command_time
)
def _debug_vis_impl(self):
# create markers if necessary
# -- goal
if self.base_vel_goal_visualizer is None:
marker_cfg = ARROW_X_MARKER_CFG
marker_cfg.markers["arrow"].color = (0.0, 1.0, 0.0)
marker_cfg.markers["arrow"].scale = (2.5, 0.1, 0.1)
self.base_vel_goal_visualizer = VisualizationMarkers("/Visuals/Command/velocity_goal", marker_cfg)
# -- current
if self.base_vel_visualizer is None:
marker_cfg = ARROW_X_MARKER_CFG
marker_cfg.markers["arrow"].color = (0.0, 0.0, 1.0)
marker_cfg.markers["arrow"].scale = (2.5, 0.1, 0.1)
self.base_vel_visualizer = VisualizationMarkers("/Visuals/Command/velocity_current", marker_cfg)
# get marker location
# -- base state
base_pos_w = self.robot.data.root_pos_w.clone()
base_pos_w[:, 2] += 0.5
# -- resolve the scales and quaternions
vel_des_arrow_scale, vel_des_arrow_quat = self._resolve_xy_velocity_to_arrow(self.command[:, :2])
vel_arrow_scale, vel_arrow_quat = self._resolve_xy_velocity_to_arrow(self.robot.data.root_lin_vel_b[:, :2])
# -- goal
self.base_vel_goal_visualizer.visualize(base_pos_w, vel_des_arrow_quat, vel_des_arrow_scale)
# -- base velocity
self.base_vel_visualizer.visualize(base_pos_w, vel_arrow_quat, vel_arrow_scale)
"""
Internal helpers.
"""
def _resolve_xy_velocity_to_arrow(self, xy_velocity: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
def _resolve_xy_velocity_to_arrow(self, xy_velocity: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
"""Converts the XY base velocity command to arrow direction rotation."""
# obtain default scale of the marker
default_scale = self._base_vel_goal_markers.cfg.markers["arrow"].scale
default_scale = self.base_vel_goal_visualizer.cfg.markers["arrow"].scale
# arrow-scale
arrow_scale = torch.tensor(default_scale, device=self.device).repeat(xy_velocity.shape[0], 1)
arrow_scale[:, 0] *= torch.linalg.norm(xy_velocity, dim=1)
arrow_scale[:, 0] *= torch.linalg.norm(xy_velocity, dim=1) * 2.5
# arrow-direction
heading_angle = torch.atan2(xy_velocity[:, 1], xy_velocity[:, 0])
zeros = torch.zeros_like(heading_angle)
arrow_quat = quat_from_euler_xyz(zeros, zeros, heading_angle)
arrow_quat = math_utils.quat_from_euler_xyz(zeros, zeros, heading_angle)
# convert everything back from base to world frame
base_quat_w = self.robot.data.root_quat_w
arrow_quat = math_utils.quat_mul(base_quat_w, arrow_quat)
return arrow_scale, arrow_quat
......
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from .base_env import BaseEnv
from .base_env_cfg import BaseEnvCfg, ViewerCfg
from .rl_env import RLEnv, VecEnvIndices, VecEnvObs, VecEnvStepReturn
from .rl_env_cfg import RLEnvCfg
__all__ = [
"BaseEnv",
"BaseEnvCfg",
"ViewerCfg",
"RLEnv",
"RLEnvCfg",
"VecEnvIndices",
"VecEnvObs",
"VecEnvStepReturn",
]
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import builtins
from omni.isaac.orbit.command_generators import CommandGeneratorBase
from omni.isaac.orbit.managers import ActionManager, ObservationManager
from omni.isaac.orbit.scene import InteractiveScene
from omni.isaac.orbit.sim import SimulationContext
from omni.isaac.orbit.utils.timer import Timer
from .base_env_cfg import BaseEnvCfg
class BaseEnv:
"""The base environment encapsulates the simulation scene and the environment managers.
While a simulation scene or world comprises of different components such as the robots, objects,
and sensors (cameras, lidars, etc.), the environment is a higher level abstraction
that provides an interface for interacting with the simulation. The environment is comprised of
the following components:
* **Scene**: The scene manager that creates and manages the virtual world in which the robot operates.
This includes defining the robot, static and dynamic objects, sensors, etc.
* **Observation Manager**: The observation manager that generates observations from the current simulation
state and the data gathered from the sensors. These observations may include privileged information
that is not available to the robot in the real world. Additionally, user-defined terms can be added
to process the observations and generate custom observations. For example, using a network to embed
high-dimensional observations into a lower-dimensional space.
* **Action Manager**: The action manager that processes the raw actions sent to the environment and
converts them to low-level commands that are sent to the simulation. It can be configured to accept
raw actions at different levels of abstraction. For example, in case of a robotic arm, the raw actions
can be joint torques, joint positions, or end-effector poses. Similarly for a mobile base, it can be
the joint torques, or the desired velocity of the floating base.
* **Command Generator**: The command generator that generates the goal commands for the robot. These
commands are used by the observation manager to generate the observations. For example, in case of a
robotic arm, the goal commands can be the object to be grasped, or the desired end-effector pose. For
a mobile base, it can be the goal position and orientation of the base.
The environment provides a unified interface for interacting with the simulation. However, it does not
include task-specific quantities such as the reward function, or the termination conditions. These
quantities are often specific to defining Markov Decision Processes (MDPs) while the base environment
is agnostic to the MDP definition.
The environment steps forward in time at a fixed time-step. The physics simulation is decimated at a
lower time-step. This is to ensure that the simulation is stable. These two time-steps can be configured
independently using the :attr:`BaseEnvCfg.decimation` (number of simulation steps per environment step)
and the :attr:`BaseEnvCfg.sim.dt` (physics time-step) parameters. Based on these parameters, the
environment time-step is computed as the product of the two. The two time-steps can be obtained by
querying the :attr:`physics_dt` and the :attr:`step_dt` properties respectively.
"""
def __init__(self, cfg: BaseEnvCfg):
"""Initialize the environment.
Args:
cfg (BaseEnvCfg): The configuration object for the environment.
Raises:
RuntimeError: If a simulation context already exists. The environment must always create one
since it configures the simulation context and controls the simulation.
"""
# store inputs to class
self.cfg = cfg
# initialize internal variables
self._is_closed = False
# create a simulation context to control the simulator
if SimulationContext.instance() is None:
self.sim = SimulationContext(self.cfg.sim)
else:
raise RuntimeError("Simulation context already exists. Cannot create a new one.")
# set camera view for "/OmniverseKit_Persp" camera
self.sim.set_camera_view(eye=self.cfg.viewer.eye, target=self.cfg.viewer.lookat)
# print useful information
print("[INFO]: Base environment:")
print(f"\tEnvironment device : {self.device}")
print(f"\tPhysics step-size : {self.physics_dt}")
print(f"\tRendering step-size : {self.physics_dt * self.cfg.sim.substeps}")
print(f"\tEnvironment step-size : {self.step_dt}")
print(f"\tPhysics GPU pipeline : {self.cfg.sim.use_gpu_pipeline}")
print(f"\tPhysics GPU simulation: {self.cfg.sim.physx.use_gpu}")
# generate scene
with Timer("[INFO]: Time taken for scene creation"):
self.scene = InteractiveScene(self.cfg.scene)
print("[INFO]: Scene manager: ", self.scene)
# play the simulator to activate physics handles
# note: this activates the physics simulation view that exposes TensorAPIs
# note: when started in extension mode, first call sim.reset_async() and then initialize the managers
if builtins.ISAAC_LAUNCHED_FROM_TERMINAL is False:
with Timer("[INFO]: Time taken for simulation reset"):
self.sim.reset()
# add timeline event to load managers
self.load_managers()
def __del__(self):
"""Cleanup for the environment."""
self.close()
"""
Properties.
"""
@property
def num_envs(self) -> int:
"""The number of instances of the environment that are running."""
return self.scene.num_envs
@property
def physics_dt(self) -> float:
"""The physics time-step (in s).
This is the lowest time-decimation at which the simulation is happening.
"""
return self.cfg.sim.dt
@property
def step_dt(self) -> float:
"""The environment stepping time-step (in s).
This is the time-step at which the environment steps forward.
"""
return self.cfg.sim.dt * self.cfg.decimation
@property
def device(self):
"""The device on which the environment is running."""
return self.sim.device
"""
Operations.
"""
def load_managers(self):
"""Load the managers for the environment.
Note:
This must happen after the simulator is reset, i.e. after the first call to :meth:`self.sim.reset`.
"""
# prepare the managers
# note: this order is important since observation manager needs to know the command and action managers
# -- command manager
self.command_manager: CommandGeneratorBase = self.cfg.commands.class_name(self.cfg.commands, self)
print("[INFO] Command Manager: ", self.command_manager)
# -- action manager
self.action_manager = ActionManager(self.cfg.actions, self)
print("[INFO] Action Manager: ", self.action_manager)
# -- observation manager
self.observation_manager = ObservationManager(self.cfg.observations, self)
print("[INFO] Observation Manager:", self.observation_manager)
def close(self):
"""Cleanup for the environment."""
if not self._is_closed:
# update closing status
self._is_closed = True
......@@ -12,44 +12,18 @@ configuring the environment instances, viewer settings, and simulation parameter
from dataclasses import MISSING
from typing import Tuple
from omni.isaac.orbit.command_generators import CommandGeneratorBaseCfg
from omni.isaac.orbit.scene import InteractiveSceneCfg
from omni.isaac.orbit.sim import SimulationCfg
from omni.isaac.orbit.utils import configclass
__all__ = ["IsaacEnvCfg", "EnvCfg", "ViewerCfg"]
##
# General environment configuration
##
@configclass
class EnvCfg:
"""Configuration of the common environment information."""
num_envs: int = MISSING
"""Number of environment instances to create."""
env_spacing: float = MISSING
"""Spacing between cloned environments."""
episode_length_s: float = None
"""Duration of an episode (in seconds). Default is None (no limit)."""
send_time_outs: bool = True
"""Whether to send time-outs to the algorithm. Default is True."""
replicate_physics: bool = True
"""Enable/disable replication of physics schemas when using the Cloner APIs. Default is False.
Note:
In Isaac Sim 2022.2.0, domain randomization of material properties is not supported when
``replicate_physics`` is set to True.
"""
__all__ = ["BaseEnvCfg", "ViewerCfg"]
@configclass
class ViewerCfg:
"""Configuration of the scene viewport camera."""
debug_vis: bool = False
"""Whether to enable/disable debug visualization in the scene."""
eye: Tuple[float, float, float] = (7.5, 7.5, 7.5)
"""Initial camera position (in m). Default is (7.5, 7.5, 7.5)."""
lookat: Tuple[float, float, float] = (0.0, 0.0, 0.0)
......@@ -62,18 +36,26 @@ class ViewerCfg:
"""
##
# Environment configuration
##
@configclass
class IsaacEnvCfg:
class BaseEnvCfg:
"""Base configuration of the environment."""
env: EnvCfg = MISSING
"""General environment configuration."""
# simulation settings
viewer: ViewerCfg = ViewerCfg()
"""Viewer configuration. Default is ViewerCfg()."""
sim: SimulationCfg = SimulationCfg()
"""Physics simulation configuration. Default is SimulationCfg()."""
# general settings
decimation: int = MISSING
"""Number of control action updates @ sim dt per policy dt."""
# environment settings
scene: InteractiveSceneCfg = MISSING
"""Scene settings"""
observations: object = MISSING
"""Observation space settings."""
actions: object = MISSING
"""Action space settings."""
commands: CommandGeneratorBaseCfg = MISSING
"""Command generator settings."""
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
This sub-package contains implementations of various functions that can be used to create a Markov Decision Process (MDP).
The functions can be provided to different managers that are responsible for the different aspects of the MDP. These include
the observation, reward, termination, actions, randomization and curriculum managers.
"""
from .actions import * # noqa: F401, F403
from .curriculums import * # noqa: F401, F403
from .observations import * # noqa: F401, F403
from .randomizations import * # noqa: F401, F403
from .rewards import * # noqa: F401, F403
from .terminations import * # noqa: F401, F403
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
This sub-module contains implementations of various action terms that can be used in the environment.
The action terms are responsible for processing the raw actions sent to the environment and applying them to the
asset managed by the term.
"""
from .actions_cfg import *
from .binary_joint_actions import *
from .joint_actions import *
from .non_holonomic_actions import *
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
from dataclasses import MISSING
from omni.isaac.orbit.managers.action_manager import ActionTerm, ActionTermCfg
from omni.isaac.orbit.utils import configclass
from . import binary_joint_actions, joint_actions, non_holonomic_actions
##
# Joint actions.
##
@configclass
class JointActionCfg(ActionTermCfg):
"""Configuration for the base joint action term.
See :class:`JointAction` for more details.
"""
joint_names: list[str] = MISSING
"""List of joint names or regex expressions that the action will be mapped to."""
scale: float | dict[str, float] = 1.0
"""Scale factor for the action (float or dict of regex expressions). Defaults to 1.0."""
offset: float | dict[str, float] = 0.0
"""Offset factor for the action (float or dict of regex expressions). Defaults to 0.0."""
@configclass
class JointPositionActionCfg(JointActionCfg):
"""Configuration for the joint position action term.
See :class:`JointPositionAction` for more details.
"""
cls: type[ActionTerm] = joint_actions.JointPositionAction
use_default_offset: bool = True
"""Whether to use default joint positions configured in the articulation asset as offset.
Defaults to True.
This overrides the settings from :attr:`offset` if set to True.
"""
@configclass
class JointVelocityActionCfg(JointActionCfg):
"""Configuration for the joint velocity action term.
See :class:`JointVelocityAction` for more details.
"""
cls: type[ActionTerm] = joint_actions.JointVelocityAction
use_default_offset: bool = True
"""Whether to use default joint velocities configured in the articulation asset as offset.
Defaults to True.
This overrides the settings from :attr:`offset` if set to True.
"""
@configclass
class JointEffortActionCfg(JointActionCfg):
"""Configuration for the joint effort action term.
See :class:`JointEffortAction` for more details.
"""
cls: type[ActionTerm] = joint_actions.JointEffortAction
##
# Gripper actions.
##
@configclass
class BinaryJointActionCfg(ActionTermCfg):
"""Configuration for the base binary joint action term.
See :class:`BinaryJointAction` for more details.
"""
joint_names: list[str] = MISSING
"""List of joint names or regex expressions that the action will be mapped to."""
open_command: dict[str, float] = MISSING
"""The joint command to move to *open* configuration."""
close_command: dict[str, float] = MISSING
"""The joint command to move to *close* configuration."""
@configclass
class BinaryJointPositionActionCfg(BinaryJointActionCfg):
"""Configuration for the binary joint position action term.
See :class:`BinaryJointPositionAction` for more details.
"""
cls: type[ActionTerm] = binary_joint_actions.BinaryJointPositionAction
@configclass
class BinaryJointVelocityActionCfg(BinaryJointActionCfg):
"""Configuration for the binary joint velocity action term.
See :class:`BinaryJointVelocityAction` for more details.
"""
cls: type[ActionTerm] = binary_joint_actions.BinaryJointVelocityAction
##
# Non-holonomic actions.
##
@configclass
class NonHolonomicActionCfg(ActionTermCfg):
"""Configuration for the non-holonomic action term with dummy joints at the base.
See :class:`NonHolonomicAction` for more details.
"""
cls: type[ActionTerm] = non_holonomic_actions.NonHolonomicAction
body_name: str = MISSING
"""Name of the body which has the dummy mechanism connected to."""
x_joint_name: str = MISSING
"""The dummy joint name in the x direction."""
y_joint_name: str = MISSING
"""The dummy joint name in the y direction."""
yaw_joint_name: str = MISSING
"""The dummy joint name in the yaw direction."""
scale: tuple[float, float] = (1.0, 1.0)
"""Scale factor for the action. Defaults to (1.0, 1.0)."""
offset: tuple[float, float] = (0.0, 0.0)
"""Offset factor for the action. Defaults to (0.0, 0.0)."""
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import torch
from typing import TYPE_CHECKING
import carb
import omni.isaac.orbit.utils.string as string_utils
from omni.isaac.orbit.assets.articulation import Articulation
from omni.isaac.orbit.managers.action_manager import ActionTerm
if TYPE_CHECKING:
from omni.isaac.orbit.envs.rl_env import BaseEnv
from . import actions_cfg
class BinaryJointAction(ActionTerm):
"""Base class for binary joint actions.
This action term maps a binary action to the *open* or *close* joint configurations. These configurations are
specified through the :class:`BinaryJointActionCfg` object. If the input action is a float vector, the action
is considered binary based on the sign of the action values.
Based on above, we follow the following convention for the binary action:
1. Open action: 1 (bool) or positive values (float).
2. Close action: 0 (bool) or negative values (float).
The action term can mostly be used for gripper actions, where the gripper is either open or closed. This
helps in devising a mimicking mechanism for the gripper, since in simulation it is often not possible to
add such constraints to the gripper.
"""
cfg: actions_cfg.BinaryJointActionCfg
"""The configuration of the action term."""
_asset: Articulation
"""The articulation asset on which the action term is applied."""
def __init__(self, cfg: actions_cfg.BinaryJointActionCfg, env: BaseEnv) -> None:
# initialize the action term
super().__init__(cfg, env)
# resolve the joints over which the action term is applied
self._joint_ids, self._joint_names = self._asset.find_joints(self.cfg.joint_names)
self._num_joints = len(self._joint_ids)
# log the resolved joint names for debugging
carb.log_info(
f"Resolved joint names for the action term {self.__class__.__name__}: {self._joint_names} [{self._joint_ids}]"
)
# create tensors for raw and processed actions
self._raw_actions = torch.zeros(self.num_envs, 1, device=self.device)
self._processed_actions = torch.zeros(self.num_envs, self._num_joints, device=self.device)
# parse open command
self._open_command = torch.zeros(self._num_joints, device=self.device)
index_list, name_list, value_list = string_utils.resolve_matching_names_values(
self.cfg.open_command_expr, self._joint_names
)
if len(index_list) != self._num_joints:
raise ValueError(
f"Could not resolve all joints for the action term. Missing: {set(self._joint_names) - set(name_list)}"
)
self._open_command[:, index_list] = torch.tensor(value_list, device=self.device)
# parse close command
self._close_command = torch.zeros_like(self._open_command)
index_list, name_list, value_list = string_utils.resolve_matching_names_values(
self.cfg.close_command_expr, self._joint_names
)
if len(index_list) != self._num_joints:
raise ValueError(
f"Could not resolve all joints for the action term. Missing: {set(self._joint_names) - set(name_list)}"
)
self._close_command[:, index_list] = torch.tensor(value_list, device=self.device)
"""
Properties.
"""
@property
def action_dim(self) -> int:
return 1
@property
def raw_actions(self) -> torch.Tensor:
return self._raw_actions
@property
def processed_actions(self) -> torch.Tensor:
return self._processed_actions
"""
Operations.
"""
def process_actions(self, actions: torch.Tensor):
# store the raw actions
self._raw_actions[:] = actions
# compute the binary mask
if actions.dtype == torch.bool:
# true: close, false: open
binary_mask = (actions == 0).unsqueeze(1)
else:
# true: close, false: open
binary_mask = (actions < 0).unsqueeze(1)
# compute the command
self._processed_actions = torch.where(binary_mask, self._close_command, self._open_command)
class BinaryJointPositionAction(BinaryJointAction):
"""Binary joint action that sets the binary action into joint position targets."""
cfg: actions_cfg.BinaryJointPositionActionCfg
"""The configuration of the action term."""
def apply_actions(self):
self._asset.set_joint_position_target(self._processed_actions, joint_ids=self._joint_ids)
class BinaryJointVelocityAction(BinaryJointAction):
"""Binary joint action that sets the binary action into joint velocity targets."""
cfg: actions_cfg.BinaryJointVelocityActionCfg
"""The configuration of the action term."""
def apply_actions(self):
self._asset.set_joint_velocity_target(self._processed_actions, joint_ids=self._joint_ids)
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import torch
from typing import TYPE_CHECKING
import carb
import omni.isaac.orbit.utils.string as string_utils
from omni.isaac.orbit.assets.articulation import Articulation
from omni.isaac.orbit.managers.action_manager import ActionTerm
if TYPE_CHECKING:
from omni.isaac.orbit.envs.rl_env import BaseEnv
from . import actions_cfg
class JointAction(ActionTerm):
r"""Base class for joint actions.
This action term performs pre-processing of the raw actions using affine transformations (scale and offset).
These transformations can be configured to be applied to a subset of the articulation's joints.
Mathematically, the action term is defined as:
.. math::
\text{action} = \text{offset} + \text{scaling} \times \text{input action}
where :math:`\text{action}` is the action that is sent to the articulation's actuated joints, :math:`\text{offset}`
is the offset applied to the input action, :math:`\text{scaling}` is the scaling applied to the input
action, and :math:`\text{input action}` is the input action from the user.
Based on above, this kind of action transformation ensures that the input and output actions are in the same
units and dimensions. The child classes of this action term can then map the output action to a specific
desired command of the articulation's joints (e.g. position, velocity, etc.).
"""
cfg: actions_cfg.JointActionCfg
"""The configuration of the action term."""
_asset: Articulation
"""The articulation asset on which the action term is applied."""
_scale: torch.Tensor | float
"""The scaling factor applied to the input action."""
_offset: torch.Tensor | float
"""The offset applied to the input action."""
def __init__(self, cfg: actions_cfg.JointActionCfg, env: BaseEnv) -> None:
# initialize the action term
super().__init__(cfg, env)
# resolve the joints over which the action term is applied
self._joint_ids, self._joint_names = self._asset.find_joints(self.cfg.joint_names)
self._num_joints = len(self._joint_ids)
# log the resolved joint names for debugging
carb.log_info(
f"Resolved joint names for the action term {self.__class__.__name__}: {self._joint_names} [{self._joint_ids}]"
)
# Avoid indexing across all joints for efficiency
if self._num_joints == self._asset.num_joints:
self._joint_ids = ...
# create tensors for raw and processed actions
self._raw_actions = torch.zeros(self.num_envs, self.action_dim, device=self.device)
self._processed_actions = torch.zeros_like(self.raw_actions)
# parse scale
if isinstance(cfg.scale, float):
self._scale = cfg.scale
elif isinstance(cfg.scale, dict):
self._scale = torch.ones(1, self.action_dim, device=self.device)
# resolve the dictionary config
index_list, _, value_list = string_utils.resolve_matching_names_values(self.cfg.scale, self._joint_names)
self._scale[:, index_list] = torch.tensor(value_list, device=self.device)
else:
raise ValueError(f"Unsupported scale type: {type(cfg.scale)}")
# parse offset
if isinstance(cfg.offset, float):
self._offset = cfg.offset
elif isinstance(cfg.offset, dict):
self._offset = torch.zeros_like(self._raw_actions)
# resolve the dictionary config
index_list, _, value_list = string_utils.resolve_matching_names_values(self.cfg.offset, self._joint_names)
self._offset[:, index_list] = torch.tensor(value_list, device=self.device)
else:
raise ValueError(f"Unsupported offset type: {type(cfg.offset)}")
"""
Properties.
"""
@property
def action_dim(self) -> int:
return self._num_joints
@property
def raw_actions(self) -> torch.Tensor:
return self._raw_actions
@property
def processed_actions(self) -> torch.Tensor:
return self._processed_actions
"""
Operations.
"""
def process_actions(self, actions: torch.Tensor):
# store the raw actions
self._raw_actions[:] = actions
# apply the affine transformations
self._processed_actions = self._raw_actions * self._scale + self._offset
class JointPositionAction(JointAction):
"""Joint action term that applies the processed actions to the articulation's joints as position commands."""
cfg: actions_cfg.JointPositionActionCfg
"""The configuration of the action term."""
def __init__(self, cfg: actions_cfg.JointPositionActionCfg, env: BaseEnv):
# initialize the action term
super().__init__(cfg, env)
# use default joint positions as offset
if cfg.use_default_offset:
self._offset = self._asset.data.default_joint_pos[:, self._joint_ids].clone()
def apply_actions(self):
# set position targets
self._asset.set_joint_position_target(self.processed_actions, joint_ids=self._joint_ids)
class JointVelocityAction(JointAction):
"""Joint action term that applies the processed actions to the articulation's joints as velocity commands."""
cfg: actions_cfg.JointVelocityActionCfg
"""The configuration of the action term."""
def __init__(self, cfg: actions_cfg.JointVelocityActionCfg, env: BaseEnv):
# initialize the action term
super().__init__(cfg, env)
# use default joint velocity as offset
if cfg.offset_with_default:
self._offset = self._asset.data.default_joint_vel[:, self._joint_ids].clone()
def apply_actions(self):
# set joint velocity targets
self._asset.set_joint_velocity_target(self.processed_actions, joint_ids=self._joint_ids)
class JointEffortAction(JointAction):
"""Joint action term that applies the processed actions to the articulation's joints as effort commands."""
cfg: actions_cfg.JointEffortActionCfg
"""The configuration of the action term."""
def __init__(self, cfg: actions_cfg.JointEffortActionCfg, env: BaseEnv):
super().__init__(cfg, env)
def apply_actions(self):
# set joint effort targets
self._asset.set_joint_effort_target(self.processed_actions, joint_ids=self._joint_ids)
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import torch
from typing import TYPE_CHECKING
import carb
from omni.isaac.orbit.assets.articulation import Articulation
from omni.isaac.orbit.managers.action_manager import ActionTerm
from omni.isaac.orbit.utils.math import euler_xyz_from_quat
if TYPE_CHECKING:
from omni.isaac.orbit.envs import BaseEnv
from . import actions_cfg
class NonHolonomicAction(ActionTerm):
r"""Non-holonomic action that maps a two dimensional action to the velocity of the robot in
the x, y and yaw directions.
This action term helps model a skid-steer robot base. The action is a 2D vector which comprises of the
forward velocity :math:`v_{B,x}` and the turning rate :\omega_{B,z}: in the base frame. Using the current
base orientation, the commands are transformed into dummy joint velocity targets as:
.. math::
\dot{q}_{0, des} &= v_{B,x} \cos(\theta) \\
\dot{q}_{1, des} &= v_{B,x} \sin(\theta) \\
\dot{q}_{2, des} &= \omega_{B,z}
where :math:`\theta` is the yaw of the 2-D base. Since the base is simulated as a dummy joint, the yaw is directly
the value of the revolute joint along z, i.e., :math:`q_2 = \theta`.
.. note::
The current implementation assumes that the base is simulated with three dummy joints (prismatic joints along x
and y, and revolute joint along z). This is because it is easier to consider the mobile base as a floating link
controlled by three dummy joints, in comparison to simulating wheels which is at times is tricky because of
friction settings.
However, the action term can be extended to support other base configurations as well.
.. tip::
For velocity control of the base with dummy mechanism, we recommend setting high damping gains to the joints.
This ensures that the base remains unperturbed from external disturbances, such as an arm mounted on the base.
"""
cfg: actions_cfg.NonHolonomicActionCfg
"""The configuration of the action term."""
_asset: Articulation
"""The articulation asset on which the action term is applied."""
_scale: torch.Tensor
"""The scaling factor applied to the input action. Shape is (1, 2)."""
_offset: torch.Tensor
"""The offset applied to the input action. Shape is (1, 2)."""
def __init__(self, cfg: actions_cfg.NonHolonomicActionCfg, env: BaseEnv):
# initialize the action term
super().__init__(cfg, env)
# parse the joint information
# -- x joint
x_joint_id, x_joint_name = self._asset.find_joints(self.cfg.x_joint_name)
if len(x_joint_id) != 1:
raise ValueError(
f"Expected a single joint match for the x joint name: {self.cfg.x_joint_name}, got {len(x_joint_id)}"
)
# -- y joint
y_joint_id, y_joint_name = self._asset.find_joints(self.cfg.y_joint_name)
if len(y_joint_id) != 1:
raise ValueError(f"Found more than one joint match for the y joint name: {self.cfg.y_joint_name}")
# -- yaw joint
yaw_joint_id, yaw_joint_name = self._asset.find_joints(self.cfg.yaw_joint_name)
if len(yaw_joint_id) != 1:
raise ValueError(f"Found more than one joint match for the yaw joint name: {self.cfg.yaw_joint_name}")
# parse the body index
self._body_idx, self._body_name = self._asset.find_bodies(self.cfg.body_name)
if len(self._body_idx) != 1:
raise ValueError(f"Found more than one body match for the body name: {self.cfg.body_name}")
# process into a list of joint ids
self._joint_ids = [x_joint_id[0], y_joint_id[0], yaw_joint_id[0]]
self._joint_names = [x_joint_name[0], y_joint_name[0], yaw_joint_name[0]]
# log info for debugging
carb.log_info(
f"Resolved joint names for the action term {self.__class__.__name__}: {self._joint_names} [{self._joint_ids}]"
)
carb.log_info(
f"Resolved body name for the action term {self.__class__.__name__}: {self._body_name} [{self._body_idx}]"
)
# create tensors for raw and processed actions
self._raw_actions = torch.zeros(self.num_envs, 2, device=self.device)
self._processed_actions = torch.zeros_like(self.raw_actions)
self._joint_vel_command = torch.zeros(self.num_envs, 3, device=self.device)
# save the scale and offset as tensors
self._scale = torch.tensor(self.cfg.scale, device=self.device).unsqueeze(0)
self._offset = torch.tensor(self.cfg.offset, device=self.device).unsqueeze(0)
"""
Properties.
"""
@property
def action_dim(self) -> int:
return 2
@property
def raw_actions(self) -> torch.Tensor:
return self._raw_actions
@property
def processed_actions(self) -> torch.Tensor:
return self._processed_actions
"""
Operations.
"""
def process_actions(self, actions):
# store the raw actions
self._raw_actions[:] = actions
self._processed_actions = self.raw_actions * self._scale + self._offset
def apply_actions(self):
# obtain current heading
quat_w = self._asset.data.body_quat_w[:, self._body_idx]
yaw_w = euler_xyz_from_quat(quat_w)[2]
# compute joint velocities targets
self.joint_vel[:, 0] = torch.cos(yaw_w) * self.processed_actions[:, 0] # x
self.joint_vel[:, 1] = torch.sin(yaw_w) * self.processed_actions[:, 0] # y
self.joint_vel[:, 2] = self.processed_actions[:, 1] # yaw
# set the joint velocity targets
self._asset.set_joint_velocity_target(self.joint_vel, joint_ids=self._joint_ids)
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""This sub-module contains the common functions that can be used to create curriculum for the learning environment.
The functions can be passed to the :class:`omni.isaac.orbit.managers.CurriculumTermCfg` object to enable
the curriculum introduced by the function.
"""
from __future__ import annotations
import torch
from typing import TYPE_CHECKING, Sequence
from omni.isaac.orbit.assets import RigidObject
from omni.isaac.orbit.managers import SceneEntityCfg
from omni.isaac.orbit.terrains import TerrainImporter
if TYPE_CHECKING:
from omni.isaac.orbit.envs.rl_env import RLEnv
def terrain_levels_vel(env: RLEnv, env_ids: Sequence[int], asset_cfg: SceneEntityCfg) -> torch.Tensor:
"""Curriculum based on the distance the robot walked when commanded to move at a desired velocity.
This term is used to increase the difficulty of the terrain when the robot walks far enough and decrease the
difficulty when the robot walks less than half of the distance required by the commanded velocity.
.. note::
It is only possible to use this term with the terrain type ``generator``. For further information
on different terrain types, check the :class:`omni.isaac.orbit.terrains.TerrainImporter` class.
Returns:
torch.Tensor: The mean terrain level for the given environment ids.
"""
# extract the used quantities (to enable type-hinting)
asset: RigidObject = env.scene[asset_cfg.name]
terrain: TerrainImporter = env.scene.terrain
# compute the distance the robot walked
distance = torch.norm(asset.data.root_pos_w[env_ids, :2] - env.scene.env_origins[env_ids, :2], dim=1)
# robots that walked far enough progress to harder terrains
move_up = distance > terrain.cfg.terrain_generator.size[0] / 2
# robots that walked less than half of their required distance go to simpler terrains
move_down = distance < torch.norm(env.command_manager.command[env_ids, :2], dim=1) * env.max_episode_length_s * 0.5
move_down *= ~move_up
# update terrain levels
terrain.update_env_origins(env_ids, move_up, move_down)
# return the mean terrain level
return torch.mean(terrain.terrain_levels.float())
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""This sub-module contains the common functions that can be used to create observation terms.
The functions can be passed to the :class:`omni.isaac.orbit.managers.ObservationTermCfg` object to enable
the observation introduced by the function.
"""
from __future__ import annotations
import torch
from typing import TYPE_CHECKING
from omni.isaac.orbit.assets import Articulation, RigidObject
from omni.isaac.orbit.managers import SceneEntityCfg
from omni.isaac.orbit.sensors import RayCaster
if TYPE_CHECKING:
from omni.isaac.orbit.envs.base_env import BaseEnv
"""
Root state.
"""
def base_lin_vel(env: BaseEnv, asset_cfg: SceneEntityCfg) -> torch.Tensor:
"""Root linear velocity in the asset's root frame."""
# extract the used quantities (to enable type-hinting)
asset: RigidObject = env.scene[asset_cfg.name]
return asset.data.root_lin_vel_b
def base_ang_vel(env: BaseEnv, asset_cfg: SceneEntityCfg) -> torch.Tensor:
"""Root angular velocity in the asset's root frame."""
# extract the used quantities (to enable type-hinting)
asset: RigidObject = env.scene[asset_cfg.name]
return asset.data.root_ang_vel_b
def projected_gravity(env: BaseEnv, asset_cfg: SceneEntityCfg) -> torch.Tensor:
"""Gravity projection on the asset's root frame."""
# extract the used quantities (to enable type-hinting)
asset: RigidObject = env.scene[asset_cfg.name]
return asset.data.projected_gravity_b
"""
Joint state.
"""
def joint_pos_rel(env: BaseEnv, asset_cfg: SceneEntityCfg) -> torch.Tensor:
"""The joint positions of the asset w.r.t. the default joint positions."""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
return asset.data.joint_pos - asset.data.default_joint_pos
def joint_vel_rel(env: BaseEnv, asset_cfg: SceneEntityCfg):
"""The joint velocities of the asset w.r.t. the default joint velocities."""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
return asset.data.joint_vel - asset.data.default_joint_vel
"""
Sensors.
"""
def height_scan(env: BaseEnv, asset_cfg: SceneEntityCfg, sensor_cfg: SceneEntityCfg) -> torch.Tensor:
"""Height scan from the given sensor w.r.t. the asset's root frame."""
# extract the used quantities (to enable type-hinting)
asset: RigidObject = env.scene[asset_cfg.name]
sensor: RayCaster = env.scene.sensors[sensor_cfg.name]
# TODO (@dhoeller): is this sensor specific or we can generalize it?
hit_points_z = torch.nan_to_num(sensor.data.ray_hits_w[..., 2], posinf=-1.0)
# compute the height scan: robot_z - ground_z - offset
heights = asset.data.root_state_w[:, 2].unsqueeze(1) - hit_points_z - 0.5
# return the height scan
return heights
"""
Actions.
"""
def action(env: BaseEnv) -> torch.Tensor:
"""The last input action to the environment."""
return env.action_manager.action
"""
Commands.
"""
def generated_commands(env: BaseEnv) -> torch.Tensor:
"""The generated command from the command generator."""
return env.command_manager.command
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""This sub-module contains the common functions that can be used to enable reward functions.
The functions can be passed to the :class:`omni.isaac.orbit.managers.RewardTermCfg` object to include
the reward introduced by the function.
"""
from __future__ import annotations
import torch
from typing import TYPE_CHECKING
from omni.isaac.orbit.assets import Articulation, RigidObject
from omni.isaac.orbit.managers import SceneEntityCfg
from omni.isaac.orbit.sensors import ContactSensor
if TYPE_CHECKING:
from omni.isaac.orbit.envs.rl_env import RLEnv
"""
Root penalties.
"""
def lin_vel_z_l2(env: RLEnv, asset_cfg: SceneEntityCfg) -> torch.Tensor:
"""Penalize z-axis base linear velocity using L2-kernel."""
# extract the used quantities (to enable type-hinting)
asset: RigidObject = env.scene[asset_cfg.name]
return torch.square(asset.data.root_lin_vel_b[:, 2])
def ang_vel_xy_l2(env: RLEnv, asset_cfg: SceneEntityCfg) -> torch.Tensor:
"""Penalize xy-axis base angular velocity using L2-kernel."""
# extract the used quantities (to enable type-hinting)
asset: RigidObject = env.scene[asset_cfg.name]
return torch.sum(torch.square(asset.data.root_ang_vel_b[:, :2]), dim=1)
def flat_orientation_l2(env: RLEnv, asset_cfg: SceneEntityCfg) -> torch.Tensor:
"""Penalize non-flat base orientation using L2-kernel.
This is computed by penalizing the xy-components of the projected gravity vector.
"""
# extract the used quantities (to enable type-hinting)
asset: RigidObject = env.scene[asset_cfg.name]
return torch.sum(torch.square(asset.data.projected_gravity_b[:, :2]), dim=1)
def base_height_l2(env: RLEnv, asset_cfg: SceneEntityCfg, target_height: float) -> torch.Tensor:
"""Penalize asset height from its target using L2-kernel.
Note:
Currently, it assumes a flat terrain, i.e. the target height is in the world frame.
"""
# extract the used quantities (to enable type-hinting)
asset: RigidObject = env.scene[asset_cfg.name]
# TODO: Fix this for rough-terrain.
return torch.square(asset.data.root_pos_w[:, 2] - target_height)
"""
Joint penalties.
"""
def joint_torques_l2(env: RLEnv, asset_cfg: SceneEntityCfg) -> torch.Tensor:
"""Penalize torques applied on the articulation using L2-kernel."""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
return torch.sum(torch.square(asset.data.applied_torque), dim=1)
def joint_vel_l2(env: RLEnv, asset_cfg: SceneEntityCfg) -> torch.Tensor:
"""Penalize joint velocities on the articulation."""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
return torch.sum(torch.square(asset.data.joint_vel), dim=1)
def joint_acc_l2(env: RLEnv, asset_cfg: SceneEntityCfg) -> torch.Tensor:
"""Penalize joint accelerations on the articulation using L2-kernel."""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
return torch.sum(torch.square(asset.data.joint_acc), dim=1)
def joint_pos_limits(env: RLEnv, asset_cfg: SceneEntityCfg) -> torch.Tensor:
"""Penalize joint positions if they cross the soft limits.
This is computed as a sum of the absolute value of the difference between the joint position and the soft limits.
"""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# compute out of limits constraints
out_of_limits = -(asset.data.joint_pos - asset.data.soft_joint_pos_limits[..., 0]).clip(max=0.0)
out_of_limits += (asset.data.joint_pos - asset.data.soft_joint_pos_limits[..., 1]).clip(min=0.0)
return torch.sum(out_of_limits, dim=1)
def joint_vel_limits(env: RLEnv, asset_cfg: SceneEntityCfg, soft_ratio: float) -> torch.Tensor:
"""Penalize joint velocities if they cross the soft limits.
This is computed as a sum of the absolute value of the difference between the joint velocity and the soft limits.
Args:
soft_ratio (float) -> torch.Tensor: The ratio of the soft limits to be used.
"""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# compute out of limits constraints
out_of_limits = torch.abs(asset.data.joint_vel) - asset.data.soft_joint_vel_limits * soft_ratio
# clip to max error = 1 rad/s per joint to avoid huge penalties
out_of_limits = out_of_limits.clip_(min=0.0, max=1.0)
return torch.sum(out_of_limits, dim=1)
"""
Action penalties.
"""
def applied_torque_limits(env: RLEnv, asset_cfg: SceneEntityCfg) -> torch.Tensor:
"""Penalize applied torques if they cross the limits.
This is computed as a sum of the absolute value of the difference between the applied torques and the limits.
.. caution::
Currently, this only works for explicit actuators since we manually compute the applied torques.
For implicit actuators, we currently cannot retrieve the applied torques from the physics engine.
"""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# compute out of limits constraints
# TODO: We need to fix this to support implicit joints.
out_of_limits = torch.abs(asset.data.applied_torque - asset.data.computed_torque)
return torch.sum(out_of_limits, dim=1)
def action_rate_l2(env: RLEnv) -> torch.Tensor:
"""Penalize the rate of change of the actions using L2-kernel."""
return torch.sum(torch.square(env.action_manager.action - env.action_manager.prev_action), dim=1)
"""
Contact sensor.
"""
def undesired_contacts(env: RLEnv, sensor_cfg: SceneEntityCfg, threshold: float) -> torch.Tensor:
"""Penalize undesired contacts as the number of violations that are above a threshold."""
# extract the used quantities (to enable type-hinting)
contact_sensor: ContactSensor = env.scene.sensors[sensor_cfg.name]
# check if contact force is above threshold
net_contact_forces = contact_sensor.data.net_forces_w_history
is_contact = torch.max(torch.norm(net_contact_forces[:, :, sensor_cfg.body_ids], dim=-1), dim=1)[0] > threshold
# sum over contacts for each environment
return torch.sum(is_contact, dim=1)
def contact_forces(env: RLEnv, sensor_cfg: SceneEntityCfg, threshold: float) -> torch.Tensor:
"""Penalize contact forces as the amount of violations of the net contact force."""
# extract the used quantities (to enable type-hinting)
contact_sensor: ContactSensor = env.scene.sensors[sensor_cfg.name]
net_contact_forces = contact_sensor.data.net_forces_w_history
# compute the violation
violation = torch.max(torch.norm(net_contact_forces[:, :, sensor_cfg.body_ids], dim=-1), dim=1)[0] - threshold
# compute the penalty
return torch.sum(violation.clip(min=0.0), dim=1)
"""
Velocity-tracking rewards.
"""
def track_lin_vel_xy_exp(env: RLEnv, asset_cfg: SceneEntityCfg, std: float) -> torch.Tensor:
"""Reward tracking of linear velocity commands (xy axes) using exponential kernel."""
# extract the used quantities (to enable type-hinting)
asset: RigidObject = env.scene[asset_cfg.name]
# compute the error
lin_vel_error = torch.sum(
torch.square(env.command_manager.command[:, :2] - asset.data.root_lin_vel_b[:, :2]), dim=1
)
return torch.exp(-lin_vel_error / std**2)
def track_ang_vel_z_exp(env: RLEnv, asset_cfg: SceneEntityCfg, std: float) -> torch.Tensor:
"""Reward tracking of angular velocity commands (yaw) using exponential kernel."""
# extract the used quantities (to enable type-hinting)
asset: RigidObject = env.scene[asset_cfg.name]
# compute the error
ang_vel_error = torch.square(env.command_manager.command[:, 2] - asset.data.root_ang_vel_b[:, 2])
return torch.exp(-ang_vel_error / std**2)
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""This sub-module contains the common functions that can be used to activate certain terminations.
The functions can be passed to the :class:`omni.isaac.orbit.managers.TerminationTermCfg` object to enable
the termination introduced by the function.
"""
from __future__ import annotations
import torch
from typing import TYPE_CHECKING
from omni.isaac.orbit.assets import Articulation, RigidObject
from omni.isaac.orbit.managers import SceneEntityCfg
from omni.isaac.orbit.sensors import ContactSensor
if TYPE_CHECKING:
from omni.isaac.orbit.envs.rl_env import RLEnv
"""
MDP terminations.
"""
def time_out(env: RLEnv) -> torch.Tensor:
"""Terminate the episode when the episode length exceeds the maximum episode length."""
return env.episode_length_buf >= env.max_episode_length
def command_resample(env: RLEnv, num_commands: torch.Tensor) -> torch.Tensor:
return torch.logical_and((env.command_manager.time_left <= 0.0), (env.command_manager.num_commands == num_commands))
"""
Root terminations.
"""
def bad_orientation(env: RLEnv, asset_cfg: SceneEntityCfg, limit_angle: float) -> torch.Tensor:
"""Terminate when the asset's orientation is too far from the desired orientation limits.
This is computed by checking the angle between the projected gravity vector and the z-axis.
"""
# extract the used quantities (to enable type-hinting)
asset: RigidObject = env.scene[asset_cfg.name]
return torch.acos(-asset.data.projected_gravity_b[:, 2]).abs() > limit_angle
def base_height(env: RLEnv, asset_cfg: SceneEntityCfg, minimum_height: float) -> torch.Tensor:
"""Terminate when the asset's height is below the minimum height.
Note:
This is currently only supported for flat terrains, i.e. the minimum height is in the world frame.
"""
# extract the used quantities (to enable type-hinting)
asset: RigidObject = env.scene[asset_cfg.name]
return asset.data.root_pos_w[:, 2] < minimum_height
"""
Joint terminations.
"""
def joint_pos_limit(env: RLEnv, asset_cfg: SceneEntityCfg) -> torch.Tensor:
"""Terminate when the asset's joint positions are outside of the soft joint limits."""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# compute any violations
out_of_upper_limits = torch.any(asset.data.joint_pos > asset.data.soft_joint_pos_limits[:, 0], dim=1)
out_of_lower_limits = torch.any(asset.data.joint_pos < asset.data.soft_joint_pos_limits[:, 1], dim=1)
return torch.logical_or(out_of_upper_limits, out_of_lower_limits)
def joint_velocity_limit(env: RLEnv, asset_cfg: SceneEntityCfg, max_velocity) -> torch.Tensor:
"""Terminate when the asset's joint velocities are outside of the soft joint limits."""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# TODO read max velocities per joint from robot
return torch.any(torch.abs(asset.data.joint_vel) > max_velocity, dim=1)
def joint_torque_limit(env: RLEnv, asset_cfg: SceneEntityCfg) -> torch.Tensor:
"""Terminate when torque applied on the asset's joints are are outside of the soft joint limits."""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
return torch.any(
torch.isclose(asset.data.computed_torques, asset.data.applied_torque),
dim=1,
)
"""
Contact sensor.
"""
def illegal_contact(env: RLEnv, sensor_cfg: SceneEntityCfg, threshold: float) -> torch.Tensor:
"""Terminate when the contact force on the sensor exceeds the force threshold."""
# extract the used quantities (to enable type-hinting)
contact_sensor: ContactSensor = env.scene.sensors[sensor_cfg.name]
net_contact_forces = contact_sensor.data.net_forces_w_history
# check if any contact force exceeds the threshold
return torch.any(
torch.max(torch.norm(net_contact_forces[:, :, sensor_cfg.body_ids], dim=-1), dim=1)[0] > threshold, dim=1
)
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from dataclasses import MISSING
from omni.isaac.orbit.utils import configclass
from .base_env_cfg import BaseEnvCfg
@configclass
class RLEnvCfg(BaseEnvCfg):
"""Configuration for a reinforcement learning environment."""
# general settings
episode_length_s: float = MISSING
"""Duration of an episode (in seconds)."""
# environment settings
rewards: object = MISSING
"""Reward settings."""
terminations: object = MISSING
"""Termination settings."""
randomization: object = MISSING
"""Randomization settings."""
curriculum: object = MISSING
"""Curriculum settings."""
......@@ -35,13 +35,16 @@ Example pseudo-code for a manager:
from .action_manager import ActionManager, ActionTerm
from .curriculum_manager import CurriculumManager
from .manager_base import ManagerBase
from .manager_cfg import (
ActionTermCfg,
CurriculumTermCfg,
ManagerBaseTermCfg,
ObservationGroupCfg,
ObservationTermCfg,
RandomizationTermCfg,
RewardTermCfg,
SceneEntityCfg,
TerminationTermCfg,
)
from .observation_manager import ObservationManager
......@@ -50,6 +53,10 @@ from .reward_manager import RewardManager
from .termination_manager import TerminationManager
__all__ = [
# base
"SceneEntityCfg",
"ManagerBaseTermCfg",
"ManagerBase",
# action
"ActionTermCfg",
"ActionTerm",
......
......@@ -11,31 +11,42 @@ from __future__ import annotations
import torch
from abc import ABC, abstractmethod
from prettytable import PrettyTable
from typing import TYPE_CHECKING, Sequence
from omni.isaac.orbit.assets import AssetBase
from .manager_base import ManagerBase
from .manager_cfg import ActionTermCfg
if TYPE_CHECKING:
from omni.isaac.orbit.envs import BaseEnv
class ActionTerm(ABC):
"""Base class for action terms."""
"""Base class for action terms.
The action term is responsible for processing the raw actions sent to the environment
and applying them to the asset managed by the term. The action term is comprised of two
operations:
# TODO: Should this be here or a property?
# Are they even exposed to the user?
raw_actions: torch.Tensor
processed_actions: torch.Tensor
* Processing of actions: This operation is performed once per **environment step** and
is responsible for pre-processing the raw actions sent to the environment.
* Applying actions: This operation is performed once per **simulation step** and is
responsible for applying the processed actions to the asset managed by the term.
"""
def __init__(self, cfg: ActionTermCfg, env: object):
def __init__(self, cfg: ActionTermCfg, env: BaseEnv):
"""Initialize the action term.
Args:
cfg (ActionTermCfg): The configuration object.
env (object): The environment instance.
env (BaseEnv): The environment instance.
"""
# store the inputs
self._cfg = cfg
self.cfg = cfg
self._env = env
# parse config to obtain asset to which the term is applied
self._asset = getattr(env, cfg.asset_name)
self._asset: AssetBase = self._env.scene[self.cfg.asset_name]
"""
Properties.
......@@ -49,7 +60,7 @@ class ActionTerm(ABC):
@property
def device(self) -> str:
"""Device on which to perform computations."""
return self._env.device
return self._asset.device
@property
@abstractmethod
......@@ -57,10 +68,23 @@ class ActionTerm(ABC):
"""Dimension of the action term."""
raise NotImplementedError
@property
@abstractmethod
def raw_actions(self) -> torch.Tensor:
"""The input/raw actions sent to the term."""
raise NotImplementedError
@property
@abstractmethod
def processed_actions(self) -> torch.Tensor:
"""The actions computed by the term after applying any processing."""
raise NotImplementedError
"""
Operations.
"""
@abstractmethod
def process_actions(self, actions: torch.Tensor):
"""Processes the actions sent to the environment.
......@@ -70,9 +94,7 @@ class ActionTerm(ABC):
Args:
actions (torch.Tensor): The actions to process.
"""
# TODO: Why not make this an abstract method?
# This one line can be implemented in the child class.
self.raw_actions[:] = actions
raise NotImplementedError
@abstractmethod
def apply_actions(self):
......@@ -99,14 +121,17 @@ class ActionManager(ManagerBase):
scene (such as robots). It should be called before every simulation step.
"""
def __init__(self, cfg: object, env: object):
def __init__(self, cfg: object, env: BaseEnv):
"""Initialize the action manager.
Args:
cfg (object): The configuration object or dictionary (``dict[str, ActionTermCfg]``).
env (object): The environment instance.
env (BaseEnv): The environment instance.
"""
super().__init__(cfg, env)
# create buffers to store actions
self._action = torch.zeros((self.num_envs, self.total_action_dim), device=self.device)
self._prev_action = torch.zeros_like(self._action)
def __str__(self) -> str:
"""Returns: A string representation for action manager."""
......@@ -146,30 +171,62 @@ class ActionManager(ManagerBase):
"""Shape of each action term."""
return [term.action_dim for term in self._terms]
@property
def action(self) -> torch.Tensor:
"""The actions sent to the environment. Shape: ``(num_envs, total_action_dim)``."""
return self._action
@property
def prev_action(self) -> torch.Tensor:
"""The previous actions sent to the environment. Shape: ``(num_envs, total_action_dim)``."""
return self._prev_action
"""
Operations.
"""
def process_actions(self, actions: torch.Tensor):
def reset(self, env_ids: Sequence[int] | None = None) -> dict[str, torch.Tensor]:
"""Resets the action history.
Args:
env_ids (Optional[Sequence[int]], optional): The environment ids. Defaults to None, in which case
all environments are considered.
Returns:
Dict[str, torch.Tensor]: An empty dictionary.
"""
# resolve environment ids
if env_ids is None:
env_ids = ...
# reset the action history
self._prev_action[env_ids] = 0.0
self._action[env_ids] = 0.0
# reset the terms
return {}
def process_action(self, action: torch.Tensor):
"""Processes the actions sent to the environment.
Note:
This function should be called once per environment step.
Args:
actions (torch.Tensor): The actions to process.
action (torch.Tensor): The actions to process.
"""
# check if action dimension is valid
if self.total_action_dim != actions.shape[1]:
raise ValueError(f"Invalid action shape, expected: {self.total_action_dim}, received: {actions.shape[1]}")
if self.total_action_dim != action.shape[1]:
raise ValueError(f"Invalid action shape, expected: {self.total_action_dim}, received: {action.shape[1]}")
# store the input actions
self._prev_action[:] = self._action
self._action[:] = action.to(self.device)
# split the actions and apply to each tensor
idx = 0
for term in self._terms:
term_actions = actions[:, idx : idx + term.action_dim]
term_actions = action[:, idx : idx + term.action_dim]
term.process_actions(term_actions)
def apply_actions(self) -> None:
def apply_action(self) -> None:
"""Applies the actions to the environment/simulation.
Note:
......
......@@ -6,13 +6,18 @@
"""Curriculum manager for updating environment quantities subject to a training curriculum."""
from __future__ import annotations
import torch
from prettytable import PrettyTable
from typing import Dict, List, Optional, Sequence
from typing import TYPE_CHECKING, Sequence
from .manager_base import ManagerBase
from .manager_cfg import CurriculumTermCfg
if TYPE_CHECKING:
from omni.isaac.orbit.envs import RLEnv
class CurriculumManager(ManagerBase):
"""Manager to implement and execute specific curricula.
......@@ -25,12 +30,15 @@ class CurriculumManager(ManagerBase):
parameters. Each curriculum term should instantiate the :class:`CurriculumTermCfg` class.
"""
def __init__(self, cfg: object, env: object):
_env: RLEnv
"""The environment instance."""
def __init__(self, cfg: object, env: RLEnv):
"""Initialize the manager.
Args:
cfg (object): The configuration object or dictionary (``dict[str, CurriculumTermCfg]``)
env (object): An environment object.
env (RLEnv): An environment object.
Raises:
TypeError: If curriculum term is not of type :class:`CurriculumTermCfg`.
......@@ -65,7 +73,7 @@ class CurriculumManager(ManagerBase):
"""
@property
def active_terms(self) -> List[str]:
def active_terms(self) -> list[str]:
"""Name of active curriculum terms."""
return self._term_names
......@@ -73,7 +81,7 @@ class CurriculumManager(ManagerBase):
Operations.
"""
def log_info(self, env_ids: Optional[Sequence[int]] = None) -> Dict[str, float]:
def reset(self, env_ids: Sequence[int] | None = None) -> dict[str, float]:
"""Returns the current state of individual curriculum terms.
Note:
......@@ -82,7 +90,7 @@ class CurriculumManager(ManagerBase):
to maintain consistency with other classes.
Returns:
Dict[str, float]: Dictionary of curriculum terms and their states.
dict[str, float]: Dictionary of curriculum terms and their states.
"""
extras = {}
for term_name, term_state in self._curriculum_state.items():
......@@ -101,7 +109,7 @@ class CurriculumManager(ManagerBase):
extras[f"Curriculum/{term_name}"] = term_state
return extras
def compute(self, env_ids: Optional[Sequence[int]] = None):
def compute(self, env_ids: Sequence[int] | None = None):
"""Update the curriculum terms.
This function calls each curriculum term managed by the class.
......@@ -124,8 +132,8 @@ class CurriculumManager(ManagerBase):
def _prepare_terms(self):
# parse remaining curriculum terms and decimate their information
self._term_names: List[str] = list()
self._term_cfgs: List[CurriculumTermCfg] = list()
self._term_names: list[str] = list()
self._term_cfgs: list[CurriculumTermCfg] = list()
# check if config is dict already
if isinstance(self.cfg, dict):
......
......@@ -3,26 +3,32 @@
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import copy
import inspect
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Sequence, Union
from typing import TYPE_CHECKING, Sequence
import carb
from omni.isaac.orbit.utils import string_to_callable
from .manager_cfg import ManagerBaseTermCfg
from .manager_cfg import ManagerBaseTermCfg, SceneEntityCfg
if TYPE_CHECKING:
from omni.isaac.orbit.envs import BaseEnv
class ManagerBase(ABC):
"""Base class for all managers."""
def __init__(self, cfg: object, env: object):
def __init__(self, cfg: object, env: BaseEnv):
"""Initialize the manager.
Args:
cfg (object): The configuration object.
env (object): The environment instance.
env (BaseEnv): The environment instance.
"""
# store the inputs
self.cfg = copy.deepcopy(cfg)
......@@ -46,7 +52,7 @@ class ManagerBase(ABC):
@property
@abstractmethod
def active_terms(self) -> Union[List[str], Dict[str, List[str]]]:
def active_terms(self) -> list[str] | dict[str, list[str]]:
"""Name of active terms."""
raise NotImplementedError
......@@ -54,15 +60,15 @@ class ManagerBase(ABC):
Operations.
"""
def log_info(self, env_ids: Optional[Sequence[int]] = None) -> Dict[str, float]:
"""Returns logging information for the current time-step.
def reset(self, env_ids: Sequence[int] | None = None) -> dict[str, float]:
"""Resets the manager and returns logging information for the current time-step.
Args:
env_ids (Sequence[int], optional): The environment ids for which to log data. Defaults
:obj:`None`, which logs data for all environments.
Returns:
Dict[str, float]: Dictionary containing the logging information.
dict[str, float]: Dictionary containing the logging information.
"""
return {}
......@@ -99,6 +105,7 @@ class ManagerBase(ABC):
Raises:
TypeError: If the term configuration is not of type :class:`ManagerBaseTermCfg`.
ValueError: If the scene entity defined in the term configuration does not exist.
AttributeError: If the term function is not callable.
ValueError: If the term function's arguments are not matched by the parameters.
"""
......@@ -107,29 +114,44 @@ class ManagerBase(ABC):
raise TypeError(
f"Configuration for the term '{term_name}' is not of type ManagerBaseTermCfg. Received '{type(term_cfg)}'."
)
# check if a sensor should be enabled
if term_cfg.sensor_name is not None:
# TODO: This is a hack. We should not be enabling sensors here.
# Instead, we should be enabling sensors in the sensor manager or somewhere else.
self._env.enable_sensor(term_cfg.sensor_name)
term_cfg.params["sensor_name"] = term_cfg.sensor_name
# convert joint names to indices based on regex
# TODO: What is user wants to penalize joints on one asset and bodies on another?
if term_cfg.dof_names is not None:
# check that the asset name is provided
if term_cfg.asset_name is None:
raise ValueError(f"The term '{term_name}' requires the asset name to be provided.")
# acquire the dof indices
dof_ids, _ = getattr(self._env, term_cfg.asset_name).find_dofs(term_cfg.dof_names)
term_cfg.params["dof_ids"] = dof_ids
# convert body names to indices based on regex
if term_cfg.body_names is not None:
# check that the asset name is provided
if term_cfg.asset_name is None:
raise ValueError(f"The term '{term_name}' requires the asset name to be provided.")
# acquire the body indices
body_ids, _ = getattr(self._env, term_cfg.asset_name).find_bodies(term_cfg.body_names)
term_cfg.params["body_ids"] = body_ids
# iterate over all the entities and parse the joint and body names
for key, value in term_cfg.params.items():
# deal with string
if isinstance(value, SceneEntityCfg):
# check if the entity is valid
if value.name not in self._env.scene.keys():
raise ValueError(f"For the term '{term_name}', the scene entity '{value.name}' does not exist.")
# convert joint names to indices based on regex
if value.joint_names is not None and value.joint_ids is not None:
raise ValueError(
f"For the term '{term_name}', both 'joint_names' and 'joint_ids' are specified in '{key}'."
)
if value.joint_names is not None:
if isinstance(value.joint_names, str):
value.joint_names = [value.joint_names]
joint_ids, _ = self._env.scene[value.name].find_joints(value.joint_names)
value.joint_ids = joint_ids
# convert body names to indices based on regex
if value.body_names is not None and value.body_ids is not None:
raise ValueError(
f"For the term '{term_name}', both 'body_names' and 'body_ids' are specified in '{key}'."
)
if value.body_names is not None:
if isinstance(value.body_names, str):
value.body_names = [value.body_names]
body_ids, _ = self._env.scene[value.name].find_bodies(value.body_names)
value.body_ids = body_ids
# log the entity for checking later
msg = f"[{term_cfg.__class__.__name__}:{term_name}] Found entity '{value.name}'."
if value.joint_ids is not None:
msg += f"\n\tJoint names: {value.joint_names} [{value.joint_ids}]"
if value.body_ids is not None:
msg += f"\n\tBody names: {value.body_names} [{value.body_ids}]"
# print the information
carb.log_info(msg)
# store the entity
term_cfg.params[key] = value
# get the corresponding function or functional class
if isinstance(term_cfg.func, str):
term_cfg.func = string_to_callable(term_cfg.func)
......@@ -141,7 +163,7 @@ class ManagerBase(ABC):
# check if function is callable
if not callable(term_cfg.func):
raise AttributeError(f"The term '{term_name}' is not callable. Received: {term_cfg.func}")
# check if curriculum term's arguments are matched by params
# check if term's arguments are matched by params
term_params = list(term_cfg.params.keys())
args = inspect.getfullargspec(term_cfg.func).args
# ignore first two arguments for env and env_ids
......
......@@ -10,7 +10,7 @@ from __future__ import annotations
import torch
from dataclasses import MISSING
from typing import TYPE_CHECKING, Any, Callable, Sequence
from typing import TYPE_CHECKING, Any, Callable
from omni.isaac.orbit.utils import configclass
from omni.isaac.orbit.utils.noise import NoiseCfg
......@@ -20,38 +20,37 @@ if TYPE_CHECKING:
@configclass
class ManagerBaseTermCfg:
"""Configuration for a curriculum term."""
func: Callable = MISSING
"""The function to be called for the term.
class SceneEntityCfg:
"""Configuration for a scene entity that is used by the manager's term.
The function must take the environment object as the first argument.
Note:
It also supports `callable classes`_, i.e. classes that implement the :meth:`__call__`
method.
This class is used to specify the name of the scene entity that is queried from the
:class:`InteractiveScene` and passed to the manager's term function.
"""
..`callable objects`: https://docs.python.org/3/reference/datamodel.html#object.__call__
name: str = MISSING
"""The name of the scene entity.
This is the name defined in the scene configuration file. See the :class:`InteractiveSceneCfg`
class for more details.
"""
sensor_name: str | None = None
"""The name of the sensor required by the term. Defaults to None.
If the sensor is not already enabled, it will be enabled in the environment on initialization
of the manager and passed to the term function as a string under :attr:`sensor_name`.
"""
asset_name: str | None = None
"""The name of the asset used to resolve the joints and bodies required by the term. Defaults to None."""
dof_names: Sequence[str] | None = None
"""The names of the joints from the asset required by the term. Defaults to None.
joint_names: str | list[str] | None = None
"""The names of the joints from the scene entity. Defaults to None.
The names can be either joint names or a regular expression matching the joint names.
These are converted to joint indices on initialization of the manager and passed to the term
function as a list of joint indices under :attr:`dof_ids`.
"""
body_names: Sequence[str] | None = None
joint_ids: list[int] | None = None
"""The indices of the joints from the asset required by the term. Defaults to None.
If ``joint_names`` is specified, this is filled in automatically on initialization of the
manager.
"""
body_names: str | list[str] | None = None
"""The names of the bodies from the asset required by the term. Defaults to None.
The names can be either body names or a regular expression matching the body names.
......@@ -59,11 +58,45 @@ class ManagerBaseTermCfg:
These are converted to body indices on initialization of the manager and passed to the term
function as a list of body indices under :attr:`body_ids`.
"""
params: dict[str, Any] = dict()
"""The parameters to be passed to the function as keyword arguments. Defaults to an empty dict."""
body_ids: list[int] | None = None
"""The indices of the bodies from the asset required by the term. Defaults to None.
If ``body_names`` is specified, this is filled in automatically on initialization of the
manager.
"""
@configclass
class ManagerBaseTermCfg:
"""Configuration for a manager term."""
func: Callable = MISSING
"""The function to be called for the term.
The function must take the environment object as the first argument.
Note:
It also supports `callable classes`_, i.e. classes that implement the :meth:`__call__`
method.
.. _`callable classes`: https://docs.python.org/3/reference/datamodel.html#object.__call__
"""
params: dict[str, Any | SceneEntityCfg] = dict()
"""The parameters to be passed to the function as keyword arguments. Defaults to an empty dict.
.. note::
If the value is a :class:`SceneEntityCfg` object, the manager will query the scene entity
from the :class:`InteractiveScene` and process the entity's joints and bodies as specified
in the :class:`SceneEntityCfg` object.
"""
"""Action manager."""
##
# Action manager.
##
@configclass
......@@ -77,7 +110,9 @@ class ActionTermCfg:
"""Name of the asset (object or robot) on which action is applied."""
"""Curriculum manager."""
##
# Curriculum manager.
##
@configclass
......@@ -93,7 +128,9 @@ class CurriculumTermCfg(ManagerBaseTermCfg):
"""
"""Observation manager."""
##
# Observation manager.
##
@configclass
......@@ -139,7 +176,9 @@ class ObservationGroupCfg:
"""
"""Randomization manager."""
##
# Randomization manager
##
@configclass
......@@ -173,7 +212,9 @@ class RandomizationTermCfg(ManagerBaseTermCfg):
"""
"""Reward manager."""
##
# Reward manager.
##
@configclass
......@@ -199,7 +240,9 @@ class RewardTermCfg(ManagerBaseTermCfg):
"""
"""Termination manager."""
##
# Termination manager.
##
@configclass
......
......@@ -6,13 +6,18 @@
"""Observation manager for computing observation signals for a given world."""
from __future__ import annotations
import torch
from prettytable import PrettyTable
from typing import Dict, List, Tuple
from typing import TYPE_CHECKING
from .manager_base import ManagerBase
from .manager_cfg import ObservationGroupCfg, ObservationTermCfg
if TYPE_CHECKING:
from omni.isaac.orbit.envs import BaseEnv
class ObservationManager(ManagerBase):
"""Manager for computing observation signals for a given world.
......@@ -26,16 +31,16 @@ class ObservationManager(ManagerBase):
observation term should instantiate the :class:`ObservationTermCfg` class.
"""
def __init__(self, cfg: object, env: object):
def __init__(self, cfg: object, env: BaseEnv):
"""Initialize observation manager.
Args:
cfg (object): The configuration object or dictionary (``dict[str, ObservationGroupCfg]``).
env (object): The environment instance.
env (BaseEnv): The environment instance.
"""
super().__init__(cfg, env)
# compute combined vector for obs group
self._group_obs_dim: Dict[str, Tuple[int, ...]] = dict()
self._group_obs_dim: dict[str, tuple[int, ...]] = dict()
for group_name, group_term_dims in self._group_obs_term_dim.items():
term_dims = [torch.tensor(dims, device="cpu") for dims in group_term_dims]
self._group_obs_dim[group_name] = tuple(torch.sum(torch.stack(term_dims, dim=0), dim=0).tolist())
......@@ -73,17 +78,17 @@ class ObservationManager(ManagerBase):
"""
@property
def active_terms(self) -> Dict[str, List[str]]:
def active_terms(self) -> dict[str, list[str]]:
"""Name of active observation terms in each group."""
return self._group_obs_term_names
@property
def group_obs_dim(self) -> Dict[str, Tuple[int, ...]]:
def group_obs_dim(self) -> dict[str, tuple[int, ...]]:
"""Shape of observation tensor in each group."""
return self._group_obs_dim
@property
def group_obs_term_dim(self) -> Dict[str, List[Tuple[int, ...]]]:
def group_obs_term_dim(self) -> dict[str, list[tuple[int, ...]]]:
"""Shape of observation tensor for each term in each group."""
return self._group_obs_term_dim
......@@ -91,7 +96,7 @@ class ObservationManager(ManagerBase):
Operations.
"""
def compute(self) -> Dict[str, torch.Tensor]:
def compute(self) -> dict[str, torch.Tensor]:
"""Compute the observations per group.
The method computes the observations for each group and returns a dictionary with keys as
......@@ -148,10 +153,10 @@ class ObservationManager(ManagerBase):
"""Prepares a list of observation terms functions."""
# create buffers to store information for each observation group
# TODO: Make this more convenient by using data structures.
self._group_obs_term_names: Dict[str, List[str]] = dict()
self._group_obs_term_dim: Dict[str, List[int]] = dict()
self._group_obs_term_cfgs: Dict[str, List[ObservationTermCfg]] = dict()
self._group_obs_concatenate: Dict[str, bool] = dict()
self._group_obs_term_names: dict[str, list[str]] = dict()
self._group_obs_term_dim: dict[str, list[int]] = dict()
self._group_obs_term_cfgs: dict[str, list[ObservationTermCfg]] = dict()
self._group_obs_concatenate: dict[str, bool] = dict()
# check if config is dict already
if isinstance(self.cfg, dict):
......
......@@ -5,15 +5,20 @@
"""Randomization manager for randomizing different elements in the scene."""
from __future__ import annotations
import logging
import torch
from prettytable import PrettyTable
from typing import Dict, List, Optional, Sequence
from typing import TYPE_CHECKING, Sequence
import carb
from .manager_base import ManagerBase
from .manager_cfg import RandomizationTermCfg
if TYPE_CHECKING:
from omni.isaac.orbit.envs import RLEnv
class RandomizationManager(ManagerBase):
"""Manager for randomizing different elements in the scene.
......@@ -44,12 +49,15 @@ class RandomizationManager(ManagerBase):
"""
def __init__(self, cfg: object, env: object):
_env: RLEnv
"""The environment instance."""
def __init__(self, cfg: object, env: RLEnv):
"""Initialize the randomization manager.
Args:
cfg (object): A configuration object or dictionary (``dict[str, RandomizationTermCfg]``).
env (object): An environment object.
env (RLEnv): An environment object.
"""
super().__init__(cfg, env)
......@@ -84,12 +92,7 @@ class RandomizationManager(ManagerBase):
"""
@property
def dt(self) -> float:
"""The environment time-step (in seconds)."""
return self._env.dt
@property
def active_terms(self) -> Dict[str, List[str]]:
def active_terms(self) -> dict[str, list[str]]:
"""Name of active randomization terms."""
return self._mode_term_names
......@@ -97,7 +100,7 @@ class RandomizationManager(ManagerBase):
Operations.
"""
def randomize(self, mode: str, env_ids: Optional[Sequence[int]] = None, dt: Optional[float] = None):
def randomize(self, mode: str, env_ids: Sequence[int] | None = None, dt: float | None = None):
"""Calls each randomization term in the specified mode.
Note:
......@@ -108,19 +111,24 @@ class RandomizationManager(ManagerBase):
mode (str): The mode of randomization.
env_ids (Optional[Sequence[int]]): The indices of the environments to apply randomization to.
Defaults to None, in which case the randomization is applied to all environments.
dt (Optional[float]): The time step of the environment. Defaults to None, in which case the time
step of the environment is used.
dt (Optional[float], optional): The time step of the environment. This is only used for the "interval" mode.
Defaults to None, in which case the randomization is not applied.
Raises:
ValueError: If the mode is ``"interval"`` and the time step is not provided.
"""
# check if mode is valid
if mode not in self._mode_term_names:
logging.warning(f"Randomization mode '{mode}' is not defined. Skipping randomization.")
carb.log_warn(f"Randomization mode '{mode}' is not defined. Skipping randomization.")
return
# iterate over all the randomization terms
for index, term_cfg in enumerate(self._mode_term_cfgs[mode]):
# resample interval if needed
if mode == "interval":
if dt is None:
dt = self.dt
raise ValueError(
f"Randomization mode '{mode}' requires the time step of the environment to be passed to the randomization manager."
)
# extract time left for this term
time_left = self._interval_mode_time_left[index]
# update the time left for each environment
......@@ -140,10 +148,10 @@ class RandomizationManager(ManagerBase):
def _prepare_terms(self):
"""Prepares a list of randomization functions."""
# parse remaining randomization terms and decimate their information
self._mode_term_names: Dict[str, List[str]] = dict()
self._mode_term_cfgs: Dict[str, List[RandomizationTermCfg]] = dict()
self._mode_term_names: dict[str, list[str]] = dict()
self._mode_term_cfgs: dict[str, list[RandomizationTermCfg]] = dict()
# buffer to store the time left for each environment for "interval" mode
self._interval_mode_time_left: List[torch.Tensor] = list()
self._interval_mode_time_left: list[torch.Tensor] = list()
# check if config is dict already
if isinstance(self.cfg, dict):
......
......@@ -6,13 +6,18 @@
"""Reward manager for computing reward signals for a given world."""
from __future__ import annotations
import torch
from prettytable import PrettyTable
from typing import Dict, List, Optional, Sequence
from typing import TYPE_CHECKING, Sequence
from .manager_base import ManagerBase
from .manager_cfg import RewardTermCfg
if TYPE_CHECKING:
from omni.isaac.orbit.envs import RLEnv
class RewardManager(ManagerBase):
"""Manager for computing reward signals for a given world.
......@@ -32,12 +37,15 @@ class RewardManager(ManagerBase):
"""
def __init__(self, cfg: object, env: object):
_env: RLEnv
"""The environment instance."""
def __init__(self, cfg: object, env: RLEnv):
"""Initialize the reward manager.
Args:
cfg (object): The configuration object or dictionary (``dict[str, RewardTermCfg]``).
env (object): The environment instance.
env (RLEnv): The environment instance.
"""
super().__init__(cfg, env)
# prepare extra info to store individual reward term information
......@@ -71,12 +79,7 @@ class RewardManager(ManagerBase):
"""
@property
def dt(self) -> float:
"""The environment time-step (in seconds)."""
return self._env.dt
@property
def active_terms(self) -> List[str]:
def active_terms(self) -> list[str]:
"""Name of active reward terms."""
return self._term_names
......@@ -84,7 +87,7 @@ class RewardManager(ManagerBase):
Operations.
"""
def log_info(self, env_ids: Optional[Sequence[int]] = None) -> Dict[str, torch.Tensor]:
def reset(self, env_ids: Sequence[int] | None = None) -> dict[str, torch.Tensor]:
"""Returns the episodic sum of individual reward terms.
Args:
......@@ -92,7 +95,7 @@ class RewardManager(ManagerBase):
individual reward terms is to be returned. Defaults to all the environment ids.
Returns:
Dict[str, torch.Tensor]: Dictionary of episodic sum of individual reward terms.
dict[str, torch.Tensor]: Dictionary of episodic sum of individual reward terms.
"""
# resolve environment ids
if env_ids is None:
......@@ -100,18 +103,23 @@ class RewardManager(ManagerBase):
# store information
extras = {}
for key in self._episode_sums.keys():
extras["Episode Reward/" + key] = torch.mean(self._episode_sums[key][env_ids]) / (
self._env.max_episode_length * self.dt
) # FIXME
# store information
# r_1 + r_2 + ... + r_n
episodic_sum_avg = torch.mean(self._episode_sums[key][env_ids])
extras["Episode Reward/" + key] = episodic_sum_avg / self._env.max_episode_length_s
# reset episodic sum
self._episode_sums[key][env_ids] = 0.0
return extras
def compute(self) -> torch.Tensor:
def compute(self, dt: float) -> torch.Tensor:
"""Computes the reward signal as a weighted sum of individual terms.
This function calls each reward term managed by the class and adds them to compute the net
reward signal. It also updates the episodic sums corresponding to individual reward terms.
Args:
dt (float): The time-step interval of the environment.
Returns:
torch.Tensor: The net reward signal of shape (num_envs,).
"""
......@@ -120,7 +128,7 @@ class RewardManager(ManagerBase):
# iterate over all the reward terms
for name, term_cfg in zip(self._term_names, self._term_cfgs):
# compute term's value
value = term_cfg.func(self._env, **term_cfg.params) * term_cfg.weight
value = term_cfg.func(self._env, **term_cfg.params) * term_cfg.weight * dt
# update total reward
self._reward_buf += value
# update episodic sum
......@@ -135,8 +143,8 @@ class RewardManager(ManagerBase):
def _prepare_terms(self):
"""Prepares a list of reward functions."""
# parse remaining reward terms and decimate their information
self._term_names: List[str] = list()
self._term_cfgs: List[RewardTermCfg] = list()
self._term_names: list[str] = list()
self._term_cfgs: list[RewardTermCfg] = list()
# check if config is dict already
if isinstance(self.cfg, dict):
......@@ -159,7 +167,6 @@ class RewardManager(ManagerBase):
# note: we multiply weights by dt to make them agnostic to control decimation
if term_cfg.weight == 0:
continue
term_cfg.weight *= self.dt
# add function to list
self._term_names.append(term_name)
self._term_cfgs.append(term_cfg)
......@@ -6,13 +6,18 @@
"""Termination manager for computing done signals for a given world."""
from __future__ import annotations
import torch
from prettytable import PrettyTable
from typing import Dict, List, Optional, Sequence
from typing import TYPE_CHECKING, Sequence
from .manager_base import ManagerBase
from .manager_cfg import TerminationTermCfg
if TYPE_CHECKING:
from omni.isaac.orbit.envs import RLEnv
class TerminationManager(ManagerBase):
"""Manager for computing done signals for a given world.
......@@ -26,12 +31,15 @@ class TerminationManager(ManagerBase):
parameters. Each termination term should instantiate the :class:`TerminationTermCfg` class.
"""
def __init__(self, cfg: object, env: object):
_env: RLEnv
"""The environment instance."""
def __init__(self, cfg: object, env: RLEnv):
"""Initializes the termination manager.
Args:
cfg (object): The configuration object or dictionary (``dict[str, TerminationTermCfg]``).
env (object): An environment object.
env (RLEnv): An environment object.
"""
super().__init__(cfg, env)
# prepare extra info to store individual termination term information
......@@ -65,7 +73,7 @@ class TerminationManager(ManagerBase):
"""
@property
def active_terms(self) -> List[str]:
def active_terms(self) -> list[str]:
"""Name of active termination terms."""
return self._term_names
......@@ -83,7 +91,7 @@ class TerminationManager(ManagerBase):
Operations.
"""
def log_info(self, env_ids: Optional[Sequence[int]] = None) -> Dict[str, torch.Tensor]:
def reset(self, env_ids: Sequence[int] | None = None) -> dict[str, torch.Tensor]:
"""Returns the episodic counts of individual termination terms.
Args:
......@@ -91,7 +99,7 @@ class TerminationManager(ManagerBase):
all environments are considered.
Returns:
Dict[str, torch.Tensor]: Dictionary of episodic sum of individual reward terms.
dict[str, torch.Tensor]: Dictionary of episodic sum of individual reward terms.
"""
# resolve environment ids
if env_ids is None:
......@@ -99,7 +107,9 @@ class TerminationManager(ManagerBase):
# add to episode dict
extras = {}
for key in self._episode_dones.keys():
extras["Episode Termination/" + key] = torch.count_nonzero(self._episode_dones[key][env_ids])
# store information
extras["Episode Termination/" + key] = torch.count_nonzero(self._episode_dones[key][env_ids]).item()
# reset episode dones
self._episode_dones[key][env_ids] = False
return extras
......@@ -135,8 +145,8 @@ class TerminationManager(ManagerBase):
def _prepare_terms(self):
"""Prepares a list of termination functions."""
# parse remaining termination terms and decimate their information
self._term_names: List[str] = list()
self._term_cfgs: List[TerminationTermCfg] = list()
self._term_names: list[str] = list()
self._term_cfgs: list[TerminationTermCfg] = list()
# check if config is dict already
if isinstance(self.cfg, dict):
......
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
This sub-package contains the classes for an interactive scene.
A scene is a collection of entities (e.g., terrain, articulations, sensors, lights, etc.) that can be added to the
simulation. However, only a subset of these entities are of direct interest for the user to interact with.
For example, the user may want to interact with a robot in the scene, but not with the terrain or the lights.
For this reason, we integrate the different entities into a single class called :class:`InteractiveScene`.
The interactive scene performs the following tasks:
1. It parses the configuration class :class:`InteractiveSceneCfg` to create the scene. This configuration class is
inherited by the user to add entities to the scene.
2. It clones the entities based on the number of environments specified by the user.
3. It clubs the entities into different groups based on their type (e.g., articulations, sensors, etc.).
4. It provides a set of methods to unify the common operations on the entities in the scene (e.g., resetting internal
buffers, writing buffers to simulation and updating buffers from simulation).
The interactive scene can be passed around to different modules in the framework to perform different tasks.
For instance, computing the observations based on the state of the scene, or randomizing the scene, or applying
actions to the scene. All these are handled by different "managers" in the framework. Please refer to the
:mod:`omni.isaac.orbit.managers` sub-package for more details.
"""
from .interactive_scene import InteractiveScene
from .interactive_scene_cfg import InteractiveSceneCfg
__all__ = ["InteractiveScene", "InteractiveSceneCfg"]
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
from dataclasses import MISSING
from omni.isaac.orbit.utils.configclass import configclass
@configclass
class InteractiveSceneCfg:
"""Configuration for the interactive scene.
The users can inherit from this class to add entities to their scene. This is then parsed by the
:class:`InteractiveScene` class to create the scene.
.. note::
The adding of entities to the scene is sensitive to the order of the attributes in the configuration.
Please make sure to add the entities in the order you want them to be added to the scene.
The recommended order of specification is terrain, physics-related assets (articulations and rigid bodies),
sensors and non-physics-related assets (lights).
For example, to add a robot to the scene, the user can create a configuration class as follows:
.. code-block:: python
import omni.isaac.orbit.sim as sim_utils
from omni.isaac.orbit.assets import AssetBaseCfg
from omni.isaac.orbit.assets.config.anymal import ANYMAL_C_CFG
from omni.isaac.orbit.scene import InteractiveSceneCfg
from omni.isaac.orbit.sensors.ray_caster import GridPatternCfg, RayCasterCfg
from omni.isaac.orbit.utils import configclass
@configclass
class MySceneCfg(InteractiveSceneCfg):
# terrain - flat terrain plane
terrain = TerrainImporterCfg(
prim_path="/World/ground",
terrain_type="plane",
)
# articulation - robot 1
robot_1 = ANYMAL_C_CFG.replace(prim_path="{ENV_REGEX_NS}/Robot_1")
# articulation - robot 2
robot_2 = ANYMAL_C_CFG.replace(prim_path="{ENV_REGEX_NS}/Robot_2")
robot_2.init_state.pos = (0.0, 1.0, 0.6)
# sensor - ray caster attached to the base of robot 1 that scans the ground
height_scanner = RayCasterCfg(
prim_path="{ENV_REGEX_NS}/Robot_1/base",
offset=RayCasterCfg.OffsetCfg(pos=(0.0, 0.0, 20.0)),
attach_yaw_only=True,
pattern_cfg=GridPatternCfg(resolution=0.1, size=[1.6, 1.0]),
debug_vis=True,
mesh_prim_paths=["/World/ground"],
)
# extras - light
light = AssetBaseCfg(
prim_path="/World/light",
spawn=sim_utils.DistantLightCfg(intensity=3000.0, color=(0.75, 0.75, 0.75)),
init_state=AssetBaseCfg.InitialStateCfg(pos=(0.0, 0.0, 500.0)),
)
"""
num_envs: int = MISSING
"""Number of environment instances handled by the scene."""
env_spacing: float = MISSING
"""Spacing between environments.
This is the default distance between environment origins in the scene. Used only when ``num_envs > 1``.
"""
lazy_sensor_update: bool = True
"""Whether to update sensors only when they are accessed. Default is True.
If true, the sensor data is only updated when their attribute ``data`` is accessed. Otherwise, the sensor
data is updated every time sensors are updated.
"""
replicate_physics: bool = True
"""Enable/disable replication of physics schemas when using the Cloner APIs. Default is True.
Note:
In Isaac Sim 2022.2.0, domain randomization of material properties is not supported when
``replicate_physics`` is set to True.
"""
......@@ -137,11 +137,11 @@ class ContactSensor(SensorBase):
if env_ids is None:
env_ids = ...
# reset accumulative data buffers
self._data.current_air_time[env_ids].zero_()
self._data.last_air_time[env_ids].zero_()
self._data.net_forces_w[env_ids].zero_()
self._data.current_air_time[env_ids] = 0.0
self._data.last_air_time[env_ids] = 0.0
self._data.net_forces_w[env_ids] = 0.0
# reset the data history
self._data.net_forces_w_history[env_ids].zero_()
self._data.net_forces_w_history[env_ids] = 0.0
# Set all reset sensors to not outdated since their value won't be updated till next sim step.
self._is_outdated[env_ids] = False
......
......@@ -255,3 +255,13 @@ class SimulationCfg:
The material is created at the path: ``{physics_prim_path}/defaultMaterial``.
"""
shutdown_app_on_stop: bool = True
"""Enable/disable shutting down the application when the simulation is stopped. Default is True.
This flag is only used when running the simulation as a standalone application.
.. note::
When the simulation is stopped, the physics handles become invalidated. Thus, in the simplest case,
it is better to shutdown the application.
"""
......@@ -79,8 +79,12 @@ class TerrainImporter:
self.env_origins = None
self.terrain_origins = None
# marker for visualization
self.origin_visualizer = None
if self.cfg.debug_vis:
self.origin_visualizer = VisualizationMarkers("/Visuals/TerrainOrigin", cfg=FRAME_MARKER_CFG)
else:
self.origin_visualizer = None
# auto-import the terrain based on the config
if self.cfg.terrain_type == "generator":
# check config is provided
if self.cfg.terrain_generator is None:
......@@ -106,6 +110,21 @@ class TerrainImporter:
else:
raise ValueError(f"Terrain type '{self.cfg.terrain_type}' not available.")
"""
Operations - Visibility.
"""
def set_debug_vis(self, debug_vis: bool):
"""Set the debug visualization of the terrain importer.
Args:
debug_vis (bool): Whether to visualize the terrain origins.
"""
if not self.cfg.debug_vis:
raise RuntimeError("Debug visualization is not enabled for this sensor.")
# set visibility
self.origin_visualizer.set_visibility(debug_vis)
"""
Operations - Import.
"""
......@@ -221,8 +240,6 @@ class TerrainImporter:
Args:
origins (Optional[np.ndarray]): The origins of the sub-terrains. Shape: (num_rows, num_cols, 3).
"""
# create markers for the origins
markers = VisualizationMarkers("/Visuals/TerrainOrigin", cfg=FRAME_MARKER_CFG)
# decide whether to compute origins in a grid or based on curriculum
if origins is not None:
# convert to numpy
......@@ -233,7 +250,8 @@ class TerrainImporter:
# compute environment origins
self.env_origins = self._compute_env_origins_curriculum(self.cfg.num_envs, self.terrain_origins)
# put markers on the sub-terrain origins
markers.visualize(self.terrain_origins.reshape(-1, 3))
if self.origin_visualizer is not None:
self.origin_visualizer.visualize(self.terrain_origins.reshape(-1, 3))
else:
self.terrain_origins = None
# check if env spacing is valid
......@@ -242,7 +260,8 @@ class TerrainImporter:
# compute environment origins
self.env_origins = self._compute_env_origins_grid(self.cfg.num_envs, self.cfg.env_spacing)
# put markers on the grid origins
markers.visualize(self.env_origins.reshape(-1, 3))
if self.origin_visualizer is not None:
self.origin_visualizer.visualize(self.env_origins.reshape(-1, 3))
def update_env_origins(self, env_ids: torch.Tensor, move_up: torch.Tensor, move_down: torch.Tensor):
"""Update the environment origins based on the terrain levels."""
......
......@@ -92,3 +92,6 @@ class TerrainImporterCfg:
Note:
This parameter is used only when sub-terrain origins are defined.
"""
debug_vis: bool = False
"""Whether to enable visualization of terrain origins for the terrain. Defaults to False."""
......@@ -87,6 +87,13 @@ class TestSimulationContext(unittest.TestCase):
sim.set_setting("/myExt/using_omniverse_version", sim.get_version())
self.assertSequenceEqual(sim.get_setting("/myExt/using_omniverse_version"), sim.get_version())
def test_render_modes(self):
"""Test that you can change render modes."""
sim = SimulationContext()
# check default render mode
self.assertEqual(sim.render_mode, sim.RenderMode.HEADLESS)
if __name__ == "__main__":
try:
......
[package]
# Note: Semantic Versioning is used: https://semver.org/
version = "0.4.1"
version = "0.4.2"
# Description
title = "ORBIT Environments"
......
Changelog
---------
0.4.2 (2023-08-29)
~~~~~~~~~~~~~~~~~~
Changed
^^^^^^^
* Moved the base environment definition to the :class:`omni.isaac.orbit.envs.RLEnv` class. The :class:`RLEnv`
contains RL-specific managers such as the reward, termination, randomization and curriculum managers. These
are all configured using the :class:`omni.isaac.orbit.envs.RLEnvConfig` class. The :class:`RLEnv` class
inherits from the :class:`omni.isaac.orbit.envs.BaseEnv` and ``gym.Env`` classes.
Fixed
^^^^^
* Adapted the wrappers to use the new :class:`omni.isaac.orbit.envs.RLEnv` class.
0.4.1 (2023-08-02)
~~~~~~~~~~~~~~~~~~
......
......@@ -10,3 +10,25 @@ These environments are based on the `legged_gym` environments provided by Rudin
Reference:
https://github.com/leggedrobotics/legged_gym
"""
import gym
from .locomotion_env_cfg import LocomotionEnvRoughCfg, LocomotionEnvRoughCfg_PLAY
__all__ = ["LocomotionEnvRoughCfg", "LocomotionEnvRoughCfg_PLAY"]
##
# Register Gym environments.
##
gym.register(
id="Isaac-Velocity-Rough-Anymal-C-v0",
entry_point="omni.isaac.orbit.envs.rl_env:RLEnv",
kwargs={"cfg_entry_point": "omni.isaac.orbit_envs.locomotion:LocomotionEnvRoughCfg"},
)
gym.register(
id="Isaac-Velocity-Rough-Anymal-C-Play-v0",
entry_point="omni.isaac.orbit.envs.rl_env:RLEnv",
kwargs={"cfg_entry_point": "omni.isaac.orbit_envs.locomotion:LocomotionEnvRoughCfg_PLAY"},
)
import torch
from typing import TYPE_CHECKING, Sequence
if TYPE_CHECKING:
from .velocity.locomotion_env import LocomotionEnv
def terrain_levels_vel(env: "LocomotionEnv", env_ids: Sequence[int]) -> torch.Tensor:
"""
If the robot walked more than half the terrain length if moves to a harder level.
Else if it walked less than half of the distance required by the commanded velocity, it goes to a simpler level
"""
distance = torch.norm(env.robot.data.root_pos_w[env_ids, :2] - env.terrain_importer.env_origins[env_ids, :2], dim=1)
# robots that walked far enough progress to harder terrains
move_up = distance > env.cfg.terrain.terrain_generator.size[0] / 2
# robots that walked less than half of their required distance go to simpler terrains
move_down = (
distance < torch.norm(env._command_manager.command[env_ids, :2], dim=1) * env.max_episode_length * env.dt * 0.5
)
move_down *= ~move_up
# update terrain levels
env.terrain_importer.update_env_origins(env_ids, move_up, move_down)
return torch.mean(env.terrain_importer.terrain_levels.float())
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment