Unverified Commit 91f8a03e authored by Mayank Mittal's avatar Mayank Mittal Committed by GitHub

Adds new event terms to randomize joint properties and scaling mass (#495)

# Description

This MR does the following:

* Adds the function `omni.isaac.orbit.utils.math.quat_unique` to
standardize quaternion representations, i.e.
  always have a non-negative real part.
* Adds events terms for randomizing mass by scale, simulation joint
properties (stiffness, damping, armature,
  and friction)
* Adds clamping of joint positions and velocities in event terms for
resetting joints. The simulation does not
throw an error if the set values are out of their range. Hence, users
are expected to clamp them before setting.
* Fixes
`omni.isaac.orbit.envs.mdp.ExponentialMovingAverageJointPositionAction`
to smoothen the actions
  at environment frequency instead of simulation frequency.

## Type of change

- Bug fix (non-breaking change which fixes an issue)
- New feature (non-breaking change which adds functionality)
- Breaking change (fix or feature that would cause existing
functionality to not work as expected)

## Checklist

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./orbit.sh --format`
- [x] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] I have run all the tests with `./orbit.sh --test` and they pass
- [x] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [x] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there
parent 87618028
[package]
# Note: Semantic Versioning is used: https://semver.org/
version = "0.15.12"
version = "0.16.0"
# Description
title = "ORBIT framework for Robot Learning"
......
Changelog
---------
0.16.0 (2024-04-16)
~~~~~~~~~~~~~~~~~~~
Added
^^^^^
* Added the function :meth:`omni.isaac.orbit.utils.math.quat_unique` to standardize quaternion representations,
i.e. always have a non-negative real part.
* Added events terms for randomizing mass by scale, simulation joint properties (stiffness, damping, armature,
and friction)
Fixed
^^^^^
* Added clamping of joint positions and velocities in event terms for resetting joints. The simulation does not
throw an error if the set values are out of their range. Hence, users are expected to clamp them before setting.
* Fixed :class:`omni.isaac.orbit.envs.mdp.EMAJointPositionToLimitsActionCfg` to smoothen the actions
at environment frequency instead of simulation frequency.
* Renamed the following functions in :meth:`omni.isaac.orbit.envs.mdp` to avoid confusions:
* Observation: :meth:`joint_pos_norm` -> :meth:`joint_pos_limit_normalized`
* Action: :class:`ExponentialMovingAverageJointPositionAction` -> :class:`EMAJointPositionToLimitsAction`
* Termination: :meth:`base_height` -> :meth:`root_height_below_minimum`
* Termination: :meth:`joint_pos_limit` -> :meth:`joint_pos_out_of_limit`
* Termination: :meth:`joint_pos_manual_limit` -> :meth:`joint_pos_out_of_manual_limit`
* Termination: :meth:`joint_vel_limit` -> :meth:`joint_vel_out_of_limit`
* Termination: :meth:`joint_vel_manual_limit` -> :meth:`joint_vel_out_of_manual_limit`
* Termination: :meth:`joint_torque_limit` -> :meth:`joint_effort_out_of_limit`
Deprecated
^^^^^^^^^^
* Deprecated the function :meth:`omni.isaac.orbit.envs.mdp.add_body_mass` in favor of
:meth:`omni.isaac.orbit.envs.mdp.randomize_rigid_body_mass`. This supports randomizing the mass based on different
operations (add, scale, or set) and sampling distributions.
0.15.12 (2024-04-16)
~~~~~~~~~~~~~~~~~~~~
......@@ -216,7 +254,7 @@ Added
^^^^^
* Added support for the following data types inside the :class:`omni.isaac.orbit.sensors.Camera` class:
``instance_segmentation_fast`` and ``instance_id_segmentation_fast``. These are are GPU-supported annotations
``instance_segmentation_fast`` and ``instance_id_segmentation_fast``. These are GPU-supported annotations
and are faster than the regular annotations.
Fixed
......
......@@ -161,8 +161,8 @@ class AppLauncher:
If provided as an empty string, the experience file is determined based on the headless flag:
* If headless is True, the experience file is set to ``orbit.python.headless.kit``.
* If headless is False, the experience file is set to ``orbit.python.kit``.
* If headless is True, the experience file is set to ``orbit.python.headless.kit``.
* If headless is False, the experience file is set to ``orbit.python.kit``.
Args:
parser: An argument parser instance to be extended with the AppLauncher specific options.
......
......@@ -8,4 +8,5 @@
from .actions_cfg import *
from .binary_joint_actions import *
from .joint_actions import *
from .joint_actions_to_limits import *
from .non_holonomic_actions import *
......@@ -9,7 +9,7 @@ from omni.isaac.orbit.controllers import DifferentialIKControllerCfg
from omni.isaac.orbit.managers.action_manager import ActionTerm, ActionTermCfg
from omni.isaac.orbit.utils import configclass
from . import binary_joint_actions, joint_actions, non_holonomic_actions, task_space_actions
from . import binary_joint_actions, joint_actions, joint_actions_to_limits, non_holonomic_actions, task_space_actions
##
# Joint actions.
......@@ -65,22 +65,6 @@ class RelativeJointPositionActionCfg(JointActionCfg):
"""
@configclass
class ExponentialMovingAverageJointPositionActionCfg(JointPositionActionCfg):
"""Configuration for the exponential moving average joint position action term.
See :class:`ExponentialMovingAverageJointPositionAction` for more details.
"""
class_type: type[ActionTerm] = joint_actions.ExponentialMovingAverageJointPositionAction
weight: float | dict[str, float] = 1.0
"""The weight for the moving average (float or dict of regex expressions). Defaults to 1.0.
If set to 1.0, the processed action is applied directly without any moving average window.
"""
@configclass
class JointVelocityActionCfg(JointActionCfg):
"""Configuration for the joint velocity action term.
......@@ -108,6 +92,53 @@ class JointEffortActionCfg(JointActionCfg):
class_type: type[ActionTerm] = joint_actions.JointEffortAction
##
# Joint actions rescaled to limits.
##
@configclass
class JointPositionToLimitsActionCfg(ActionTermCfg):
"""Configuration for the bounded joint position action term.
See :class:`JointPositionWithinLimitsAction` for more details.
"""
class_type: type[ActionTerm] = joint_actions_to_limits.JointPositionToLimitsAction
joint_names: list[str] = MISSING
"""List of joint names or regex expressions that the action will be mapped to."""
scale: float | dict[str, float] = 1.0
"""Scale factor for the action (float or dict of regex expressions). Defaults to 1.0."""
rescale_to_limits: bool = True
"""Whether to rescale the action to the joint limits. Defaults to True.
If True, the input actions are rescaled to the joint limits, i.e., the action value in
the range [-1, 1] corresponds to the joint lower and upper limits respectively.
Note:
This operation is performed after applying the scale factor.
"""
@configclass
class EMAJointPositionToLimitsActionCfg(JointPositionToLimitsActionCfg):
"""Configuration for the exponential moving average (EMA) joint position action term.
See :class:`EMAJointPositionToLimitsAction` for more details.
"""
class_type: type[ActionTerm] = joint_actions_to_limits.EMAJointPositionToLimitsAction
alpha: float | dict[str, float] = 1.0
"""The weight for the moving average (float or dict of regex expressions). Defaults to 1.0.
If set to 1.0, the processed action is applied directly without any moving average window.
"""
##
# Gripper actions.
##
......
......@@ -6,7 +6,6 @@
from __future__ import annotations
import torch
from collections.abc import Sequence
from typing import TYPE_CHECKING
import carb
......@@ -171,78 +170,6 @@ class RelativeJointPositionAction(JointAction):
self._asset.set_joint_position_target(current_actions, joint_ids=self._joint_ids)
class ExponentialMovingAverageJointPositionAction(JointPositionAction):
r"""Joint action term that applies the processed actions to the articulation's joints as exponential
moving average position commands.
Exponential moving average is a type of moving average that gives more weight to the most recent data points.
This action term applies the processed actions as moving average position action commands.
The moving average is computed as:
.. math::
\text{applied action} = \text{weight} \times \text{processed actions} + (1 - \text{weight}) \times \text{previous applied action}
where :math:`\text{weight}` is the weight for the moving average, :math:`\text{processed actions}` are the
processed actions, and :math:`\text{previous action}` is the previous action that was applied to the articulation's
joints.
In the trivial case where the weight is 1.0, the action term behaves exactly like :class:`JointPositionAction`.
On reset, the previous action is initialized to the current joint positions of the articulation's joints.
"""
cfg: actions_cfg.ExponentialMovingAverageJointPositionActionCfg
"""The configuration of the action term."""
def __init__(self, cfg: actions_cfg.ExponentialMovingAverageJointPositionActionCfg, env: BaseEnv):
# initialize the action term
super().__init__(cfg, env)
# parse and save the moving average weight
if isinstance(cfg.weight, float):
# check that the weight is in the valid range
if not 0.0 <= cfg.weight <= 1.0:
raise ValueError(f"Moving average weight must be in the range [0, 1]. Got {cfg.weight}.")
self._weight = cfg.weight
elif isinstance(cfg.weight, dict):
self._weight = torch.ones((env.num_envs, self.action_dim), device=self.device)
# resolve the dictionary config
index_list, names_list, value_list = string_utils.resolve_matching_names_values(
cfg.weight, self._joint_names
)
# check that the weights are in the valid range
for name, value in zip(names_list, value_list):
if not 0.0 <= value <= 1.0:
raise ValueError(
f"Moving average weight must be in the range [0, 1]. Got {value} for joint {name}."
)
self._weight[:, index_list] = torch.tensor(value_list, device=self.device)
else:
raise ValueError(
f"Unsupported moving average weight type: {type(cfg.weight)}. Supported types are float and dict."
)
# initialize the previous targets
self._prev_applied_actions = torch.zeros_like(self.processed_actions)
def reset(self, env_ids: Sequence[int] | None = None) -> None:
# check if specific environment ids are provided
if env_ids is None:
env_ids = slice(None)
# reset history to current joint positions
self._prev_applied_actions[env_ids, :] = self._asset.data.joint_pos[env_ids, self._joint_ids]
def apply_actions(self):
# set position targets as moving average
current_actions = self._weight * self.processed_actions
current_actions += (1.0 - self._weight) * self._prev_applied_actions
# set position targets
self._asset.set_joint_position_target(current_actions, joint_ids=self._joint_ids)
# update previous targets
self._prev_applied_actions[:] = current_actions[:]
class JointVelocityAction(JointAction):
"""Joint action term that applies the processed actions to the articulation's joints as velocity commands."""
......
# Copyright (c) 2022-2024, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import torch
from collections.abc import Sequence
from typing import TYPE_CHECKING
import carb
import omni.isaac.orbit.utils.math as math_utils
import omni.isaac.orbit.utils.string as string_utils
from omni.isaac.orbit.assets.articulation import Articulation
from omni.isaac.orbit.managers.action_manager import ActionTerm
if TYPE_CHECKING:
from omni.isaac.orbit.envs import BaseEnv
from . import actions_cfg
class JointPositionToLimitsAction(ActionTerm):
"""Joint position action term that scales the input actions to the joint limits and applies them to the
articulation's joints.
This class is similar to the :class:`JointPositionAction` class. However, it performs additional
re-scaling of input actions to the actuator joint position limits.
While processing the actions, it performs the following operations:
1. Apply scaling to the raw actions based on :attr:`actions_cfg.JointPositionToLimitsActionCfg.scale`.
2. Clip the scaled actions to the range [-1, 1] and re-scale them to the joint limits if
:attr:`actions_cfg.JointPositionToLimitsActionCfg.rescale_to_limits` is set to True.
The processed actions are then sent as position commands to the articulation's joints.
"""
cfg: actions_cfg.JointPositionToLimitsActionCfg
"""The configuration of the action term."""
_asset: Articulation
"""The articulation asset on which the action term is applied."""
_scale: torch.Tensor | float
"""The scaling factor applied to the input action."""
def __init__(self, cfg: actions_cfg.JointPositionToLimitsActionCfg, env: BaseEnv):
# initialize the action term
super().__init__(cfg, env)
# resolve the joints over which the action term is applied
self._joint_ids, self._joint_names = self._asset.find_joints(self.cfg.joint_names)
self._num_joints = len(self._joint_ids)
# log the resolved joint names for debugging
carb.log_info(
f"Resolved joint names for the action term {self.__class__.__name__}:"
f" {self._joint_names} [{self._joint_ids}]"
)
# Avoid indexing across all joints for efficiency
if self._num_joints == self._asset.num_joints:
self._joint_ids = slice(None)
# create tensors for raw and processed actions
self._raw_actions = torch.zeros(self.num_envs, self.action_dim, device=self.device)
self._processed_actions = torch.zeros_like(self.raw_actions)
# parse scale
if isinstance(cfg.scale, (float, int)):
self._scale = float(cfg.scale)
elif isinstance(cfg.scale, dict):
self._scale = torch.ones(self.num_envs, self.action_dim, device=self.device)
# resolve the dictionary config
index_list, _, value_list = string_utils.resolve_matching_names_values(self.cfg.scale, self._joint_names)
self._scale[:, index_list] = torch.tensor(value_list, device=self.device)
else:
raise ValueError(f"Unsupported scale type: {type(cfg.scale)}. Supported types are float and dict.")
"""
Properties.
"""
@property
def action_dim(self) -> int:
return self._num_joints
@property
def raw_actions(self) -> torch.Tensor:
return self._raw_actions
@property
def processed_actions(self) -> torch.Tensor:
return self._processed_actions
"""
Operations.
"""
def process_actions(self, actions: torch.Tensor):
# store the raw actions
self._raw_actions[:] = actions
# apply affine transformations
self._processed_actions = self._raw_actions * self._scale
# rescale the position targets if configured
# this is useful when the input actions are in the range [-1, 1]
if self.cfg.rescale_to_limits:
# clip to [-1, 1]
actions = self._processed_actions.clamp(-1.0, 1.0)
# rescale within the joint limits
actions = math_utils.unscale_transform(
actions,
self._asset.data.soft_joint_pos_limits[:, self._joint_ids, 0],
self._asset.data.soft_joint_pos_limits[:, self._joint_ids, 1],
)
self._processed_actions[:] = actions[:]
def apply_actions(self):
# set position targets
self._asset.set_joint_position_target(self.processed_actions, joint_ids=self._joint_ids)
class EMAJointPositionToLimitsAction(JointPositionToLimitsAction):
r"""Joint action term that applies exponential moving average (EMA) over the processed actions as the
articulation's joints position commands.
Exponential moving average (EMA) is a type of moving average that gives more weight to the most recent data points.
This action term applies the processed actions as moving average position action commands.
The moving average is computed as:
.. math::
\text{applied action} = \alpha \times \text{processed actions} + (1 - \alpha) \times \text{previous applied action}
where :math:`\alpha` is the weight for the moving average, :math:`\text{processed actions}` are the
processed actions, and :math:`\text{previous action}` is the previous action that was applied to the articulation's
joints.
In the trivial case where the weight is 1.0, the action term behaves exactly like
the :class:`JointPositionToLimitsAction` class.
On reset, the previous action is initialized to the current joint positions of the articulation's joints.
"""
cfg: actions_cfg.EMAJointPositionToLimitsActionCfg
"""The configuration of the action term."""
def __init__(self, cfg: actions_cfg.EMAJointPositionToLimitsActionCfg, env: BaseEnv):
# initialize the action term
super().__init__(cfg, env)
# parse and save the moving average weight
if isinstance(cfg.alpha, float):
# check that the weight is in the valid range
if not 0.0 <= cfg.alpha <= 1.0:
raise ValueError(f"Moving average weight must be in the range [0, 1]. Got {cfg.alpha}.")
self._alpha = cfg.alpha
elif isinstance(cfg.alpha, dict):
self._alpha = torch.ones((env.num_envs, self.action_dim), device=self.device)
# resolve the dictionary config
index_list, names_list, value_list = string_utils.resolve_matching_names_values(
cfg.alpha, self._joint_names
)
# check that the weights are in the valid range
for name, value in zip(names_list, value_list):
if not 0.0 <= value <= 1.0:
raise ValueError(
f"Moving average weight must be in the range [0, 1]. Got {value} for joint {name}."
)
self._alpha[:, index_list] = torch.tensor(value_list, device=self.device)
else:
raise ValueError(
f"Unsupported moving average weight type: {type(cfg.alpha)}. Supported types are float and dict."
)
# initialize the previous targets
self._prev_applied_actions = torch.zeros_like(self.processed_actions)
def reset(self, env_ids: Sequence[int] | None = None) -> None:
# check if specific environment ids are provided
if env_ids is None:
env_ids = slice(None)
# reset history to current joint positions
self._prev_applied_actions[env_ids, :] = self._asset.data.joint_pos[env_ids, self._joint_ids]
def process_actions(self, actions: torch.Tensor):
# apply affine transformations
super().process_actions(actions)
# set position targets as moving average
ema_actions = self._alpha * self._processed_actions
ema_actions += (1.0 - self._alpha) * self._prev_applied_actions
# clamp the targets
self._processed_actions[:] = torch.clamp(
ema_actions,
self._asset.data.soft_joint_pos_limits[:, self._joint_ids, 0],
self._asset.data.soft_joint_pos_limits[:, self._joint_ids, 1],
)
# update previous targets
self._prev_applied_actions[:] = self._processed_actions[:]
......@@ -105,6 +105,12 @@ class UniformPoseCommandCfg(CommandTermCfg):
body_name: str = MISSING
"""Name of the body in the asset for which the commands are generated."""
make_quat_unique: bool = False
"""Whether to make the quaternion unique or not. Defaults to False.
If True, the quaternion is made unique by ensuring the real part is positive.
"""
@configclass
class Ranges:
"""Uniform distribution ranges for the pose commands."""
......
......@@ -15,7 +15,7 @@ from omni.isaac.orbit.assets import Articulation
from omni.isaac.orbit.managers import CommandTerm
from omni.isaac.orbit.markers import VisualizationMarkers
from omni.isaac.orbit.markers.config import FRAME_MARKER_CFG
from omni.isaac.orbit.utils.math import combine_frame_transforms, compute_pose_error, quat_from_euler_xyz
from omni.isaac.orbit.utils.math import combine_frame_transforms, compute_pose_error, quat_from_euler_xyz, quat_unique
if TYPE_CHECKING:
from omni.isaac.orbit.envs import BaseEnv
......@@ -120,9 +120,9 @@ class UniformPoseCommand(CommandTerm):
euler_angles[:, 0].uniform_(*self.cfg.ranges.roll)
euler_angles[:, 1].uniform_(*self.cfg.ranges.pitch)
euler_angles[:, 2].uniform_(*self.cfg.ranges.yaw)
self.pose_command_b[env_ids, 3:] = quat_from_euler_xyz(
euler_angles[:, 0], euler_angles[:, 1], euler_angles[:, 2]
)
quat = quat_from_euler_xyz(euler_angles[:, 0], euler_angles[:, 1], euler_angles[:, 2])
# make sure the quaternion has real part as positive
self.pose_command_b[env_ids, 3:] = quat_unique(quat) if self.cfg.make_quat_unique else quat
def _update_command(self):
pass
......
......@@ -62,11 +62,21 @@ def root_pos_w(env: BaseEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")
return asset.data.root_pos_w - env.scene.env_origins
def root_quat_w(env: BaseEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""Asset root orientation in the environment frame."""
def root_quat_w(
env: BaseEnv, make_quat_unique: bool = False, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")
) -> torch.Tensor:
"""Asset root orientation (w, x, y, z) in the environment frame.
If :attr:`make_quat_unique` is True, then returned quaternion is made unique by ensuring
the quaternion has non-negative real component. This is because both ``q`` and ``-q`` represent
the same orientation.
"""
# extract the used quantities (to enable type-hinting)
asset: RigidObject = env.scene[asset_cfg.name]
return asset.data.root_quat_w
quat = asset.data.root_quat_w
# make the quaternion real-part positive if configured
return math_utils.quat_unique(quat) if make_quat_unique else quat
def root_lin_vel_w(env: BaseEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
......@@ -88,20 +98,30 @@ Joint state.
"""
def joint_pos(env: BaseEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""The joint positions of the asset.
Note: Only the joints configured in :attr:`asset_cfg.joint_ids` will have their positions returned.
"""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
return asset.data.joint_pos[:, asset_cfg.joint_ids]
def joint_pos_rel(env: BaseEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""The joint positions of the asset w.r.t. the default joint positions.
NOTE: Only the joints configured in :attr:`asset_cfg.joint_ids` will have their positions returned.
Note: Only the joints configured in :attr:`asset_cfg.joint_ids` will have their positions returned.
"""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
return asset.data.joint_pos[:, asset_cfg.joint_ids] - asset.data.default_joint_pos[:, asset_cfg.joint_ids]
def joint_pos_norm(env: BaseEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
def joint_pos_limit_normalized(env: BaseEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""The joint positions of the asset normalized with the asset's joint limits.
NOTE: Only the joints configured in :attr:`asset_cfg.joint_ids` will have their normalized positions returned.
Note: Only the joints configured in :attr:`asset_cfg.joint_ids` will have their normalized positions returned.
"""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
......@@ -112,10 +132,20 @@ def joint_pos_norm(env: BaseEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("rob
)
def joint_vel(env: BaseEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")):
"""The joint velocities of the asset.
Note: Only the joints configured in :attr:`asset_cfg.joint_ids` will have their velocities returned.
"""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
return asset.data.joint_vel[:, asset_cfg.joint_ids]
def joint_vel_rel(env: BaseEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")):
"""The joint velocities of the asset w.r.t. the default joint velocities.
NOTE: Only the joints configured in :attr:`asset_cfg.joint_ids` will have their velocities returned.
Note: Only the joints configured in :attr:`asset_cfg.joint_ids` will have their velocities returned.
"""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
......
......@@ -59,10 +59,10 @@ def bad_orientation(
return torch.acos(-asset.data.projected_gravity_b[:, 2]).abs() > limit_angle
def base_height(
def root_height_below_minimum(
env: RLTaskEnv, minimum_height: float, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")
) -> torch.Tensor:
"""Terminate when the asset's height is below the minimum height.
"""Terminate when the asset's root height is below the minimum height.
Note:
This is currently only supported for flat terrains, i.e. the minimum height is in the world frame.
......@@ -77,23 +77,23 @@ Joint terminations.
"""
def joint_pos_limit(env: RLTaskEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
def joint_pos_out_of_limit(env: RLTaskEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""Terminate when the asset's joint positions are outside of the soft joint limits."""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# compute any violations
out_of_upper_limits = torch.any(asset.data.joint_pos > asset.data.soft_joint_pos_limits[..., 1], dim=1)
out_of_lower_limits = torch.any(asset.data.joint_pos < asset.data.soft_joint_pos_limits[..., 0], dim=1)
return torch.logical_or(out_of_upper_limits, out_of_lower_limits)
return torch.logical_or(out_of_upper_limits[:, asset_cfg.joint_ids], out_of_lower_limits[:, asset_cfg.joint_ids])
def joint_pos_manual_limit(
def joint_pos_out_of_manual_limit(
env: RLTaskEnv, bounds: tuple[float, float], asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")
) -> torch.Tensor:
"""Terminate when the asset's joint positions are outside of the configured bounds.
Note:
This function is similar to :func:`joint_pos_limit` but allows the user to specify the bounds manually.
This function is similar to :func:`joint_pos_out_of_limit` but allows the user to specify the bounds manually.
"""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
......@@ -105,22 +105,39 @@ def joint_pos_manual_limit(
return torch.logical_or(out_of_upper_limits, out_of_lower_limits)
def joint_vel_limit(env: RLTaskEnv, max_velocity, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
def joint_vel_out_of_limit(env: RLTaskEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""Terminate when the asset's joint velocities are outside of the soft joint limits."""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# TODO read max velocities per joint from robot
return torch.any(torch.abs(asset.data.joint_vel) > max_velocity, dim=1)
# compute any violations
limits = asset.data.soft_joint_vel_limits
return torch.any(torch.abs(asset.data.joint_vel[:, asset_cfg.joint_ids]) > limits[:, asset_cfg.joint_ids], dim=1)
def joint_torque_limit(env: RLTaskEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""Terminate when torque applied on the asset's joints are are outside of the soft joint limits."""
def joint_vel_out_of_manual_limit(
env: RLTaskEnv, max_velocity: float, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")
) -> torch.Tensor:
"""Terminate when the asset's joint velocities are outside the provided limits."""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
return torch.any(
torch.isclose(asset.data.computed_torques, asset.data.applied_torque),
dim=1,
# compute any violations
return torch.any(torch.abs(asset.data.joint_vel[:, asset_cfg.joint_ids]) > max_velocity, dim=1)
def joint_effort_out_of_limit(env: RLTaskEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""Terminate when effort applied on the asset's joints are outside of the soft joint limits.
In the actuators, the applied torque are the efforts applied on the joints. These are computed by clipping
the computed torques to the joint limits. Hence, we check if the computed torques are equal to the applied
torques.
"""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# check if any joint effort is out of limit
out_of_limits = torch.isclose(
asset.data.computed_torque[:, asset_cfg.joint_ids], asset.data.applied_torque[:, asset_cfg.joint_ids]
)
return torch.any(out_of_limits, dim=1)
"""
......
......@@ -63,7 +63,7 @@ class RigidBodyPropertiesCfg:
max_linear_velocity: float | None = None
"""Maximum linear velocity for rigid bodies (in m/s)."""
max_angular_velocity: float | None = None
"""Maximum angular velocity for rigid bodies (in rad/s)."""
"""Maximum angular velocity for rigid bodies (in deg/s)."""
max_depenetration_velocity: float | None = None
"""Maximum depenetration velocity permitted to be introduced by the solver (in m/s)."""
max_contact_impulse: float | None = None
......
......@@ -431,6 +431,22 @@ def euler_xyz_from_quat(quat: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor,
return roll % (2 * torch.pi), pitch % (2 * torch.pi), yaw % (2 * torch.pi) # TODO: why not wrap_to_pi here ?
@torch.jit.script
def quat_unique(q: torch.Tensor) -> torch.Tensor:
"""Convert a unit quaternion to a standard form where the real part is non-negative.
Quaternion representations have a singularity since ``q`` and ``-q`` represent the same
rotation. This function ensures the real part of the quaternion is non-negative.
Args:
q: The quaternion orientation in (w, x, y, z). Shape is (..., 4).
Returns:
Standardized quaternions. Shape is (..., 4).
"""
return torch.where(q[..., 0:1] < 0, -q, q)
@torch.jit.script
def quat_mul(q1: torch.Tensor, q2: torch.Tensor) -> torch.Tensor:
"""Multiply two quaternions together.
......@@ -488,7 +504,7 @@ def quat_box_minus(q1: torch.Tensor, q2: torch.Tensor) -> torch.Tensor:
re = quat_diff[:, 0] # real part, q = [w, x, y, z] = [re, im]
im = quat_diff[:, 1:] # imaginary part
norm_im = torch.norm(im, dim=1)
scale = 2.0 * torch.where(norm_im > 1.0e-7, torch.atan(norm_im / re) / norm_im, torch.sign(re))
scale = 2.0 * torch.where(norm_im > 1.0e-7, torch.atan2(norm_im, re) / norm_im, torch.sign(re))
return scale.unsqueeze(-1) * im
......@@ -637,7 +653,7 @@ def axis_angle_from_quat(quat: torch.Tensor, eps: float = 1.0e-6) -> torch.Tenso
angle = 2.0 * half_angle
# check whether to apply Taylor approximation
sin_half_angles_over_angles = torch.where(
torch.abs(angle.abs()) > eps, torch.sin(half_angle) / angle, 0.5 - angle * angle / 48
angle.abs() > eps, torch.sin(half_angle) / angle, 0.5 - angle * angle / 48
)
return quat[..., 1:4] / sin_half_angles_over_angles.unsqueeze(-1)
......@@ -1189,6 +1205,37 @@ def sample_uniform(
return torch.rand(*size, device=device) * (upper - lower) + lower
def sample_log_uniform(
lower: torch.Tensor | float, upper: torch.Tensor | float, size: int | tuple[int, ...], device: str
) -> torch.Tensor:
r"""Sample using log-uniform distribution within a range.
The log-uniform distribution is defined as a uniform distribution in the log-space. It
is useful for sampling values that span several orders of magnitude. The sampled values
are uniformly distributed in the log-space and then exponentiated to get the final values.
.. math::
x = \exp(\text{uniform}(\log(\text{lower}), \log(\text{upper})))
Args:
lower: Lower bound of uniform range.
upper: Upper bound of uniform range.
size: The shape of the tensor.
device: Device to create tensor on.
Returns:
Sampled tensor. Shape is based on :attr:`size`.
"""
# cast to tensor if not already
if not isinstance(lower, torch.Tensor):
lower = torch.tensor(lower, dtype=torch.float, device=device)
if not isinstance(upper, torch.Tensor):
upper = torch.tensor(upper, dtype=torch.float, device=device)
# sample in log-space and exponentiate
return torch.exp(sample_uniform(torch.log(lower), torch.log(upper), size, device))
def sample_cylinder(
radius: float, h_range: tuple[float, float], size: int | tuple[int, ...], device: str
) -> torch.Tensor:
......
......@@ -66,11 +66,13 @@ class TestMathUtilities(unittest.TestCase):
for quat, angle in zip(quats, angles):
with self.subTest(quat=quat, angle=angle):
self.assertTrue(torch.allclose(math_utils.axis_angle_from_quat(quat), angle, atol=1e-7))
torch.testing.assert_close(math_utils.axis_angle_from_quat(quat), angle)
def test_axis_angle_from_quat_approximation(self):
"""Test Taylor approximation from axis_angle_from_quat method
for unstable conversions where theta is very small."""
"""Test the Taylor approximation from axis_angle_from_quat method.
This test checks for unstable conversions where theta is very small.
"""
# Generate a small rotation quaternion
# Small angle
theta = torch.Tensor([0.0000001])
......@@ -88,12 +90,13 @@ class TestMathUtilities(unittest.TestCase):
axis_angle_expected = torch.tensor([theta * d for d in axis], dtype=torch.float32)
# Assert that the computed values are close to the expected values
self.assertTrue(torch.allclose(axis_angle_computed, axis_angle_expected, atol=1e-7))
torch.testing.assert_close(axis_angle_computed, axis_angle_expected)
def test_quat_error_magnitude(self):
"""Test quat_error_magnitude method."""
# Define test cases
# Each tuple contains: q1, q2, expected error
test_cases = [
# q1, q2, expected error
# No rotation
(torch.Tensor([1, 0, 0, 0]), torch.Tensor([1, 0, 0, 0]), torch.Tensor([0.0])),
# PI/2 rotation
......@@ -101,12 +104,91 @@ class TestMathUtilities(unittest.TestCase):
# PI rotation
(torch.Tensor([1.0, 0, 0.0, 0]), torch.Tensor([0.0, 0.0, 1.0, 0]), torch.Tensor([PI])),
]
# Test higher dimension
test_cases += tuple([(torch.stack(tensors) for tensors in zip(*test_cases))])
# Test higher dimension (batched) inputs
q1_list = torch.stack([t[0] for t in test_cases], dim=0)
q2_list = torch.stack([t[1] for t in test_cases], dim=0)
expected_diff_list = torch.stack([t[2] for t in test_cases], dim=0).flatten()
test_cases += [(q1_list, q2_list, expected_diff_list)]
# Iterate over test cases
for q1, q2, expected_diff in test_cases:
with self.subTest(q1=q1, q2=q2):
# Compute the error
q12_diff = math_utils.quat_error_magnitude(q1, q2)
self.assertTrue(torch.allclose(q12_diff, torch.flatten(expected_diff), atol=1e-7))
# Check that the error is close to the expected value
if len(q1.shape) > 1:
torch.testing.assert_close(q12_diff, expected_diff)
else:
self.assertAlmostEqual(q12_diff.item(), expected_diff.item(), places=5)
def test_quat_unique(self):
"""Test quat_unique method."""
# Define test cases
quats = math_utils.random_orientation(num=1024, device="cpu")
# Test positive real quaternion
pos_real_quats = math_utils.quat_unique(quats)
# Test that the real part is positive
self.assertTrue(torch.all(pos_real_quats[:, 0] > 0).item())
non_pos_indices = quats[:, 0] < 0
# Check imaginary part have sign flipped if real part is negative
torch.testing.assert_close(pos_real_quats[non_pos_indices], -quats[non_pos_indices])
torch.testing.assert_close(pos_real_quats[~non_pos_indices], quats[~non_pos_indices])
def test_quat_mul_with_quat_unique(self):
"""Test quat_mul method with different quaternions.
This test checks that the quaternion multiplication is consistent when using positive real quaternions
and regular quaternions. It makes sure that the result is the same regardless of the input quaternion sign
(i.e. q and -q are same quaternion in the context of rotations).
"""
quats_1 = math_utils.random_orientation(num=1024, device="cpu")
quats_2 = math_utils.random_orientation(num=1024, device="cpu")
# Make quats positive real
quats_1_pos_real = math_utils.quat_unique(quats_1)
quats_2_pos_real = math_utils.quat_unique(quats_2)
# Option 1: Direct computation on quaternions
quat_result_1 = math_utils.quat_mul(quats_1, math_utils.quat_conjugate(quats_2))
quat_result_1 = math_utils.quat_unique(quat_result_1)
# Option 2: Computation on positive real quaternions
quat_result_2 = math_utils.quat_mul(quats_1_pos_real, math_utils.quat_conjugate(quats_2_pos_real))
quat_result_2 = math_utils.quat_unique(quat_result_2)
# Option 3: Mixed computation
quat_result_3 = math_utils.quat_mul(quats_1, math_utils.quat_conjugate(quats_2_pos_real))
quat_result_3 = math_utils.quat_unique(quat_result_3)
# Check that the result is close to the expected value
torch.testing.assert_close(quat_result_1, quat_result_2)
torch.testing.assert_close(quat_result_2, quat_result_3)
torch.testing.assert_close(quat_result_3, quat_result_1)
def test_quat_error_mag_with_quat_unique(self):
"""Test quat_error_magnitude method with positive real quaternions."""
quats_1 = math_utils.random_orientation(num=1024, device="cpu")
quats_2 = math_utils.random_orientation(num=1024, device="cpu")
# Make quats positive real
quats_1_pos_real = math_utils.quat_unique(quats_1)
quats_2_pos_real = math_utils.quat_unique(quats_2)
# Compute the error
error_1 = math_utils.quat_error_magnitude(quats_1, quats_2)
error_2 = math_utils.quat_error_magnitude(quats_1_pos_real, quats_2_pos_real)
error_3 = math_utils.quat_error_magnitude(quats_1, quats_2_pos_real)
error_4 = math_utils.quat_error_magnitude(quats_1_pos_real, quats_2)
# Check that the error is close to the expected value
torch.testing.assert_close(error_1, error_2)
torch.testing.assert_close(error_2, error_3)
torch.testing.assert_close(error_3, error_4)
torch.testing.assert_close(error_4, error_1)
if __name__ == "__main__":
......
......@@ -118,7 +118,7 @@ class ObservationsCfg:
base_angle_to_target = ObsTerm(func=mdp.base_angle_to_target, params={"target_pos": (1000.0, 0.0, 0.0)})
base_up_proj = ObsTerm(func=mdp.base_up_proj)
base_heading_proj = ObsTerm(func=mdp.base_heading_proj, params={"target_pos": (1000.0, 0.0, 0.0)})
joint_pos_norm = ObsTerm(func=mdp.joint_pos_norm)
joint_pos_norm = ObsTerm(func=mdp.joint_pos_limit_normalized)
joint_vel_rel = ObsTerm(func=mdp.joint_vel_rel, scale=0.2)
feet_body_forces = ObsTerm(
func=mdp.body_incoming_wrench,
......@@ -190,7 +190,7 @@ class TerminationsCfg:
# (1) Terminate if the episode length is exceeded
time_out = DoneTerm(func=mdp.time_out, time_out=True)
# (2) Terminate if the robot falls
torso_height = DoneTerm(func=mdp.base_height, params={"minimum_height": 0.31})
torso_height = DoneTerm(func=mdp.root_height_below_minimum, params={"minimum_height": 0.31})
@configclass
......
......@@ -157,7 +157,7 @@ class TerminationsCfg:
time_out = DoneTerm(func=mdp.time_out, time_out=True)
# (2) Cart out of bounds
cart_out_of_bounds = DoneTerm(
func=mdp.joint_pos_manual_limit,
func=mdp.joint_pos_out_of_manual_limit,
params={"asset_cfg": SceneEntityCfg("robot", joint_names=["slider_to_cart"]), "bounds": (-3.0, 3.0)},
)
......
......@@ -146,7 +146,7 @@ class ObservationsCfg:
base_angle_to_target = ObsTerm(func=mdp.base_angle_to_target, params={"target_pos": (1000.0, 0.0, 0.0)})
base_up_proj = ObsTerm(func=mdp.base_up_proj)
base_heading_proj = ObsTerm(func=mdp.base_heading_proj, params={"target_pos": (1000.0, 0.0, 0.0)})
joint_pos_norm = ObsTerm(func=mdp.joint_pos_norm)
joint_pos_norm = ObsTerm(func=mdp.joint_pos_limit_normalized)
joint_vel_rel = ObsTerm(func=mdp.joint_vel_rel, scale=0.1)
feet_body_forces = ObsTerm(
func=mdp.body_incoming_wrench,
......@@ -245,7 +245,7 @@ class TerminationsCfg:
# (1) Terminate if the episode length is exceeded
time_out = DoneTerm(func=mdp.time_out, time_out=True)
# (2) Terminate if the robot falls
torso_height = DoneTerm(func=mdp.base_height, params={"minimum_height": 0.8})
torso_height = DoneTerm(func=mdp.root_height_below_minimum, params={"minimum_height": 0.8})
@configclass
......
......@@ -163,9 +163,9 @@ class EventCfg:
)
add_base_mass = EventTerm(
func=mdp.add_body_mass,
func=mdp.randomize_rigid_body_mass,
mode="startup",
params={"asset_cfg": SceneEntityCfg("robot", body_names="base"), "mass_range": (-5.0, 5.0)},
params={"asset_cfg": SceneEntityCfg("robot", body_names="base"), "mass_range": (-5.0, 5.0), "operation": "add"},
)
# reset
......
......@@ -8,6 +8,7 @@ from __future__ import annotations
import torch
from typing import TYPE_CHECKING
import omni.isaac.orbit.utils.math as math_utils
from omni.isaac.orbit.assets import ArticulationData
from omni.isaac.orbit.sensors import FrameTransformerData
......@@ -47,11 +48,12 @@ def ee_pos(env: RLTaskEnv) -> torch.Tensor:
return ee_pos
def ee_quat(env: RLTaskEnv) -> torch.Tensor:
"""The orientation of the end-effector in the environment frame."""
def ee_quat(env: RLTaskEnv, make_quat_unique: bool = True) -> torch.Tensor:
"""The orientation of the end-effector in the environment frame.
If :attr:`make_quat_unique` is True, the quaternion is made unique by ensuring the real part is positive.
"""
ee_tf_data: FrameTransformerData = env.scene["ee_frame"].data
ee_quat = ee_tf_data.target_quat_w[..., 0, :]
# make first element of quaternion positive
ee_quat[ee_quat[:, 0] < 0] *= -1
return ee_quat
return math_utils.quat_unique(ee_quat) if make_quat_unique else ee_quat
......@@ -168,7 +168,7 @@ class TerminationsCfg:
time_out = DoneTerm(func=mdp.time_out, time_out=True)
object_dropping = DoneTerm(
func=mdp.base_height, params={"minimum_height": -0.05, "asset_cfg": SceneEntityCfg("object")}
func=mdp.root_height_below_minimum, params={"minimum_height": -0.05, "asset_cfg": SceneEntityCfg("object")}
)
......
......@@ -77,11 +77,12 @@ class EventCfg:
# on startup
add_pole_mass = EventTerm(
func=mdp.add_body_mass,
func=mdp.randomize_rigid_body_mass,
mode="startup",
params={
"asset_cfg": SceneEntityCfg("robot", body_names=["pole"]),
"mass_range": (0.1, 0.5),
"operation": "add",
},
)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment