Unverified Commit 48146c2b authored by Mayank Mittal's avatar Mayank Mittal Committed by GitHub

Fixes MuJoCo Humanoid and Ant environments (#252)

# Description

This MR fixes the MuJoCo-style Humanoid and Ant environments to use the new structure. The MR also includes some fixes in the core APIs.

## Type of change

- Bug fix (non-breaking change which fixes an issue)
- New feature (non-breaking change which adds functionality)

## Checklist

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with `./orbit.sh --format`
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature works
- [x] I have updated the changelog and the corresponding version in the extension's `config/extension.toml` file
- [x] I have added my name to the `CONTRIBUTORS.md` or my name already exists there
parent 4565902f
[package]
# Note: Semantic Versioning is used: https://semver.org/
version = "0.9.51"
version = "0.9.52"
# Description
title = "ORBIT framework for Robot Learning"
......
Changelog
---------
0.9.52 (2023-11-29)
~~~~~~~~~~~~~~~~~~~
Changed
^^^^^^^
* Changed the warning print in :meth:`omni.isaac.orbit.sim.utils.apply_nested` method
to be more descriptive. Earlier, it was printing a warning for every instanced prim.
Now, it only prints a warning if it could not apply the attribute to any of the prims.
Added
^^^^^
* Added the method :meth:`omni.isaac.orbit.utils.assets.retrieve_file_path` to
obtain the absolute path of a file on the Nucleus server or locally.
Fixed
^^^^^
* Fixed hiding of STOP button in the :class:`AppLauncher` class when running the
simulation in headless mode.
* Fixed a bug with :meth:`omni.isaac.orbit.sim.utils.clone` failing when the input prim path
had no parent (example: "/Table").
0.9.51 (2023-11-29)
~~~~~~~~~~~~~~~~~~~
......
......@@ -208,6 +208,8 @@ class AppLauncher:
self._create_app()
# Load IsaacSim extensions
self._load_extensions()
# Hide the stop button in the toolbar
self._hide_stop_button()
"""
Properties.
......@@ -561,8 +563,6 @@ class AppLauncher:
# add orbit modules back to sys.modules
for key, value in hacked_modules.items():
sys.modules[key] = value
# hide the stop button in the toolbar
self._hide_stop_button()
def _load_extensions(self):
"""Load correct extensions based on AppLauncher's resolved config member variables."""
......@@ -648,7 +648,14 @@ class AppLauncher:
)
def _hide_stop_button(self):
"""Hide the stop button in the toolbar."""
"""Hide the stop button in the toolbar.
For standalone executions, having a stop button is confusing since it invalidates the whole simulation.
Thus, we hide the button so that users don't accidentally click it.
"""
# when we are truly headless, then we can't import the widget toolbar
# thus, we only hide the stop button when we are not headless (i.e. GUI is enabled)
if self._livestream >= 1 or not self._headless:
import omni.kit.widget.toolbar
# grey out the stop button because we don't want to stop the simulation manually in standalone mode
......
......@@ -75,7 +75,7 @@ class JointAction(ActionTerm):
if isinstance(cfg.scale, (float, int)):
self._scale = float(cfg.scale)
elif isinstance(cfg.scale, dict):
self._scale = torch.ones(1.0, self.action_dim, device=self.device)
self._scale = torch.ones(self.num_envs, self.action_dim, device=self.device)
# resolve the dictionary config
index_list, _, value_list = string_utils.resolve_matching_names_values(self.cfg.scale, self._joint_names)
self._scale[:, index_list] = torch.tensor(value_list, device=self.device)
......
......@@ -14,6 +14,7 @@ from __future__ import annotations
import torch
from typing import TYPE_CHECKING
import omni.isaac.orbit.utils.math as math_utils
from omni.isaac.orbit.assets import Articulation, RigidObject
from omni.isaac.orbit.managers import SceneEntityCfg
from omni.isaac.orbit.sensors import RayCaster
......@@ -26,6 +27,13 @@ Root state.
"""
def base_pos_z(env: BaseEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""Root height in the simulation world frame."""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
return asset.data.root_pos_w[:, 2].unsqueeze(-1)
def base_lin_vel(env: BaseEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""Root linear velocity in the asset's root frame."""
# extract the used quantities (to enable type-hinting)
......@@ -59,6 +67,15 @@ def joint_pos_rel(env: BaseEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robo
return asset.data.joint_pos - asset.data.default_joint_pos
def joint_pos_norm(env: BaseEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""The joint positions of the asset normalized with the asset's joint limits."""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
return math_utils.scale_transform(
asset.data.joint_pos, asset.data.soft_joint_pos_limits[..., 0], asset.data.soft_joint_pos_limits[..., 1]
)
def joint_vel_rel(env: BaseEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")):
"""The joint velocities of the asset w.r.t. the default joint velocities."""
# extract the used quantities (to enable type-hinting)
......@@ -79,6 +96,18 @@ def height_scan(env: BaseEnv, sensor_cfg: SceneEntityCfg) -> torch.Tensor:
return sensor.data.pos_w[:, 2].unsqueeze(1) - sensor.data.ray_hits_w[..., 2] - 0.5
def body_incoming_wrench(env: BaseEnv, asset_cfg: SceneEntityCfg) -> torch.Tensor:
"""Incoming spatial wrench on bodies of an articulation in the simulation world frame.
This is the 6-D wrench (force and torque) applied to the body link by the incoming joint force.
"""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# obtain the link incoming forces in world frame
link_incoming_forces = asset.root_physx_view.get_link_incoming_joint_force()[:, asset_cfg.body_ids]
return link_incoming_forces.view(env.num_envs, -1)
"""
Actions.
"""
......
......@@ -240,6 +240,9 @@ def reset_joints_by_scale(
# scale these values randomly
joint_pos *= sample_uniform(*position_range, joint_pos.shape, joint_pos.device)
joint_vel *= sample_uniform(*velocity_range, joint_vel.shape, joint_vel.device)
# clamp joint pos to limits
joint_pos_limits = asset.data.soft_joint_pos_limits[env_ids]
joint_pos = joint_pos.clamp_(joint_pos_limits[..., 0], joint_pos_limits[..., 1])
# set into the physics simulation
asset.write_joint_state_to_sim(joint_pos, joint_vel, env_ids=env_ids)
......@@ -266,6 +269,9 @@ def reset_joints_by_offset(
# bias these values randomly
joint_pos += sample_uniform(*position_range, joint_pos.shape, joint_pos.device)
joint_vel += sample_uniform(*velocity_range, joint_vel.shape, joint_vel.device)
# clamp joint pos to limits
joint_pos_limits = asset.data.soft_joint_pos_limits[env_ids]
joint_pos = joint_pos.clamp_(joint_pos_limits[..., 0], joint_pos_limits[..., 1])
# set into the physics simulation
asset.write_joint_state_to_sim(joint_pos, joint_vel, env_ids=env_ids)
......@@ -273,15 +279,20 @@ def reset_joints_by_offset(
def reset_scene_to_default(env: BaseEnv, env_ids: torch.Tensor):
"""Reset the scene to the default state specified in the scene configuration."""
# root states
for rigid_object in env.scene.rigid_objects.values() + env.scene.articulations.values():
# rigid bodies
for rigid_object in env.scene.rigid_objects.values():
# obtain default and deal with the offset for env origins
default_root_state = rigid_object.data.default_root_state[env_ids].clone()
default_root_state[:, 0:3] += env.scene.env_origins[env_ids]
# set into the physics simulation
rigid_object.write_root_state_to_sim(default_root_state, env_ids=env_ids)
# joint states
# articulations
for articulation_asset in env.scene.articulations.values():
# obtain default and deal with the offset for env origins
default_root_state = articulation_asset.data.default_root_state[env_ids].clone()
default_root_state[:, 0:3] += env.scene.env_origins[env_ids]
# set into the physics simulation
articulation_asset.write_root_state_to_sim(default_root_state, env_ids=env_ids)
# obtain default joint positions
default_joint_pos = articulation_asset.data.default_joint_pos[env_ids].clone()
default_joint_vel = articulation_asset.data.default_joint_vel[env_ids].clone()
......
......@@ -26,14 +26,14 @@ General.
"""
def alive_bonus(env: RLTaskEnv) -> torch.Tensor:
def is_alive(env: RLTaskEnv) -> torch.Tensor:
"""Reward for being alive."""
return ~env.reset_buf * 1.0
return (~env.termination_manager.terminated).float()
def termination_penalty(env: RLTaskEnv) -> torch.Tensor:
def is_terminated(env: RLTaskEnv) -> torch.Tensor:
"""Penalize terminated episodes that don't correspond to episodic timeouts."""
return env.reset_buf * (~env.termination_manager.time_outs)
return env.termination_manager.terminated.float()
"""
......@@ -177,6 +177,11 @@ def action_rate_l2(env: RLTaskEnv) -> torch.Tensor:
return torch.sum(torch.square(env.action_manager.action - env.action_manager.prev_action), dim=1)
def action_l2(env: RLTaskEnv) -> torch.Tensor:
"""Penalize the actions using L2-kernel."""
return torch.sum(torch.square(env.action_manager.action), dim=1)
"""
Contact sensor.
"""
......
......@@ -54,8 +54,8 @@ class RigidBodyPropertiesCfg:
For more information on kinematic bodies, please refer to the `documentation <https://openusd.org/release/wp_rigid_body_physics.html#kinematic-bodies>`_.
"""
disable_gravity: bool | None = False
"""Disable gravity for the actor. Defaults to False."""
disable_gravity: bool | None = None
"""Disable gravity for the actor."""
linear_damping: float | None = None
"""Linear damping for the body."""
angular_damping: float | None = None
......
......@@ -123,12 +123,15 @@ def apply_nested(func: Callable) -> Callable:
The function iterates over the provided prim path and all its children to apply input function
to all prims under the specified prim path.
Note:
If the function succeeds to apply to a prim, it will not look at the children of that prim.
This is based on the physics behavior that nested schemas are not allowed. For example, a parent prim
and its child prim cannot both have a rigid-body schema applied on them, or it is not possible to
have nested articulations.
While traversing the prims under the specified prim path, the function will throw a warning if it
does not succeed to apply the function to any prim. This is because the user may have intended to
apply the function to a prim that does not have valid attributes, or the prim may be an instanced prim.
Args:
func: The function to apply to all prims under a specified prim-path. The function
must take the prim-path and other arguments. It should return a boolean indicating whether
......@@ -156,6 +159,9 @@ def apply_nested(func: Callable) -> Callable:
# check if prim is valid
if not prim.IsValid():
raise ValueError(f"Prim at path '{prim_path}' is not valid.")
# add iterable to check if property was applied on any of the prims
count_success = 0
instanced_prim_paths = []
# iterate over all prims under prim-path
all_prims = [prim]
while len(all_prims) > 0:
......@@ -163,10 +169,8 @@ def apply_nested(func: Callable) -> Callable:
child_prim = all_prims.pop(0)
child_prim_path = child_prim.GetPath().pathString # type: ignore
# check if prim is a prototype
# note: we prefer throwing a warning instead of ignoring the prim since the user may
# have intended to set properties on the prototype prim.
if child_prim.IsInstance():
carb.log_warn(f"Cannot perform '{func.__name__}' on instanced prim: '{child_prim_path}'")
instanced_prim_paths.append(child_prim_path)
continue
# set properties
success = func(child_prim_path, *args, **kwargs)
......@@ -174,6 +178,17 @@ def apply_nested(func: Callable) -> Callable:
# this is based on the physics behavior that nested schemas are not allowed
if not success:
all_prims += child_prim.GetChildren()
else:
count_success += 1
# check if we were successful in applying the function to any prim
if count_success == 0:
carb.log_warn(
f"Could not perform '{func.__name__}' on any prims under: '{prim_path}'."
" This might be because of the following reasons:"
"\n\t(1) The desired attribute does not exist on any of the prims."
"\n\t(2) The desired attribute exists on an instanced prim."
f"\n\t\tDiscovered list of instanced prim paths: {instanced_prim_paths}"
)
return wrapper
......@@ -208,7 +223,7 @@ def clone(func: Callable) -> Callable:
is_regex_expression = re.match(r"^[a-zA-Z0-9/_]+$", root_path) is None
# resolve matching prims for source prim path expression
if is_regex_expression:
if is_regex_expression and root_path != "":
source_prim_paths = prim_utils.find_matching_prim_paths(root_path)
# if no matching prims are found, raise an error
if len(source_prim_paths) == 0:
......
......@@ -12,8 +12,11 @@ For more information on Omniverse Nucleus:
https://docs.omniverse.nvidia.com/app_isaacsim/app_isaacsim/overview.html#omniverse-nucleus
"""
from __future__ import annotations
import io
import os
import tempfile
from typing_extensions import Literal
import carb
......@@ -60,6 +63,53 @@ def check_file_path(path: str) -> Literal[0, 1, 2]:
return 0
def retrieve_file_path(path: str, download_dir: str | None = None, force_download: bool = True) -> str:
"""Retrieves the path to a file on the Nucleus Server or locally.
If the file exists locally, then the absolute path to the file is returned.
If the file exists on the Nucleus Server, then the file is downloaded to the local machine
and the absolute path to the file is returned.
Args:
path: The path to the file.
download_dir: The directory where the file should be downloaded. Defaults to None, in which
case the file is downloaded to the system's temporary directory.
force_download: Whether to force download the file from the Nucleus Server. This will overwrite
the local file if it exists. Defaults to True.
Returns:
The path to the file on the local machine.
Raises:
FileNotFoundError: When the file not found locally or on Nucleus Server.
RuntimeError: When the file cannot be copied from the Nucleus Server to the local machine. This
can happen when the file already exists locally and :attr:`force_download` is set to False.
"""
# check file status
file_status = check_file_path(path)
if file_status == 1:
return os.path.abspath(path)
elif file_status == 2:
# resolve download directory
if download_dir is None:
download_dir = tempfile.gettempdir()
else:
download_dir = os.path.abspath(download_dir)
# create download directory if it does not exist
if not os.path.exists(download_dir):
os.makedirs(download_dir)
# download file in temp directory using os
file_name = os.path.basename(omni.client.break_url(path).path)
target_path = os.path.join(download_dir, file_name)
# copy file to local machine
result = omni.client.copy(path, target_path)
if result != omni.client.Result.OK and not force_download:
raise RuntimeError(f"Unable to copy file: '{path}'. File already exists locally at: {target_path}")
return os.path.abspath(target_path)
else:
raise FileNotFoundError(f"Unable to find the file: {path}")
def read_file(path: str) -> io.BytesIO:
"""Reads a file from the Nucleus Server or locally.
......
......@@ -87,8 +87,8 @@ def wrap_to_pi(angles: torch.Tensor) -> torch.Tensor:
Angles in the range [-pi, pi].
"""
angles = angles.clone()
angles %= 2 * np.pi
angles -= 2 * np.pi * (angles > np.pi)
angles %= 2 * torch.pi
angles -= 2 * torch.pi * (angles > torch.pi)
return angles
......@@ -418,14 +418,14 @@ def euler_xyz_from_quat(quat: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor,
# pitch (y-axis rotation)
sin_pitch = 2.0 * (q_w * q_y - q_z * q_x)
pitch = torch.where(torch.abs(sin_pitch) >= 1, copysign(np.pi / 2.0, sin_pitch), torch.asin(sin_pitch))
pitch = torch.where(torch.abs(sin_pitch) >= 1, copysign(torch.pi / 2.0, sin_pitch), torch.asin(sin_pitch))
# yaw (z-axis rotation)
sin_yaw = 2.0 * (q_w * q_z + q_x * q_y)
cos_yaw = 1 - 2 * (q_y * q_y + q_z * q_z)
yaw = torch.atan2(sin_yaw, cos_yaw)
return roll % (2 * np.pi), pitch % (2 * np.pi), yaw % (2 * np.pi) # TODO: why not wrap_to_pi here ?
return roll % (2 * torch.pi), pitch % (2 * torch.pi), yaw % (2 * torch.pi) # TODO: why not wrap_to_pi here ?
@torch.jit.script
......@@ -969,7 +969,7 @@ def random_yaw_orientation(num: int, device: str) -> torch.Tensor:
"""
roll = torch.zeros(num, dtype=torch.float, device=device)
pitch = torch.zeros(num, dtype=torch.float, device=device)
yaw = 2 * np.pi * torch.rand(num, dtype=torch.float, device=device)
yaw = 2 * torch.pi * torch.rand(num, dtype=torch.float, device=device)
return quat_from_euler_xyz(roll, pitch, yaw)
......@@ -1041,7 +1041,7 @@ def sample_cylinder(
Sampled tensor of shape :obj:`(*size, 3)`.
"""
# sample angles
angles = (torch.rand(size, device=device) * 2 - 1) * np.pi
angles = (torch.rand(size, device=device) * 2 - 1) * torch.pi
h_min, h_max = h_range
# add shape
if isinstance(size, int):
......
......@@ -49,6 +49,6 @@ __version__ = ORBIT_TASKS_METADATA["package"]["version"]
from .utils import import_packages
# The blacklist is used to prevent importing configs from sub-packages
_BLACKLIST_PKGS = ["locomotion.velocity.config.anymal_d", "utils", "classic.ant", "classic.humanoid"]
_BLACKLIST_PKGS = ["locomotion.velocity.config.anymal_d", "utils"]
# Import all configs in this package
import_packages(__name__, _BLACKLIST_PKGS)
......@@ -7,21 +7,23 @@
Ant locomotion environment (similar to OpenAI Gym Ant-v2).
"""
# import gymnasium as gym
import gymnasium as gym
# from . import agents
from . import agents, ant_env_cfg
##
# Register Gym environments.
##
# gym.register(
# id="Isaac-Ant-v0",
# entry_point="omni.isaac.orbit.envs:RLTaskEnv",
# kwargs={
# "env_cfg_entry_point": f"{__name__}:ant_env_cfg.yaml",
# "rl_games_cfg_entry_point": f"{agents.__name__}:rl_games_ppo_cfg.yaml",
# "skrl_cfg_entry_point": f"{agents.__name__}:skrl_ppo_cfg.yaml",
# "sb3_cfg_entry_point": f"{agents.__name__}:sb3_ppo_cfg.yaml",
# },
# )
gym.register(
id="Isaac-Ant-v0",
entry_point="omni.isaac.orbit.envs:RLTaskEnv",
disable_env_checker=True,
kwargs={
"env_cfg_entry_point": ant_env_cfg.AntEnvCfg,
"rsl_rl_cfg_entry_point": agents.rsl_rl_ppo_cfg.AntPPORunnerCfg,
"rl_games_cfg_entry_point": f"{agents.__name__}:rl_games_ppo_cfg.yaml",
"skrl_cfg_entry_point": f"{agents.__name__}:skrl_ppo_cfg.yaml",
"sb3_cfg_entry_point": f"{agents.__name__}:sb3_ppo_cfg.yaml",
},
)
# Copyright (c) 2022-2023, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from . import rsl_rl_ppo_cfg
......@@ -45,13 +45,13 @@ params:
device_name: 'cuda:0'
multi_gpu: False
ppo: True
mixed_precision: False
mixed_precision: True
normalize_input: True
normalize_value: True
value_bootstrap: False
value_bootstrap: True
num_actors: -1
reward_shaper:
scale_value: 0.01
scale_value: 0.6
normalize_advantage: True
gamma: 0.99
tau: 0.95
......@@ -61,16 +61,16 @@ params:
kl_threshold: 0.008
score_to_win: 20000
max_epochs: 500
save_best_after: 200
save_best_after: 100
save_frequency: 50
grad_norm: 1.0
entropy_coef: 0.0
truncate_grads: True
e_clip: 0.2
horizon_length: 16
minibatch_size: 4096 # 32768
mini_epochs: 8
minibatch_size: 32768
mini_epochs: 4
critic_coef: 2
clip_value: True
seq_len: 4
seq_length: 4
bounds_loss_coef: 0.0001
# Copyright (c) 2022-2023, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from omni.isaac.orbit.utils import configclass
from omni.isaac.orbit_tasks.utils.wrappers.rsl_rl import (
RslRlOnPolicyRunnerCfg,
RslRlPpoActorCriticCfg,
RslRlPpoAlgorithmCfg,
)
@configclass
class AntPPORunnerCfg(RslRlOnPolicyRunnerCfg):
num_steps_per_env = 32
max_iterations = 1000
save_interval = 50
experiment_name = "ant"
empirical_normalization = False
policy = RslRlPpoActorCriticCfg(
init_noise_std=1.0,
actor_hidden_dims=[400, 200, 100],
critic_hidden_dims=[400, 200, 100],
activation="elu",
)
algorithm = RslRlPpoAlgorithmCfg(
value_loss_coef=1.0,
use_clipped_value_loss=True,
clip_param=0.2,
entropy_coef=0.0,
num_learning_epochs=5,
num_mini_batches=4,
learning_rate=5.0e-4,
schedule="adaptive",
gamma=0.99,
lam=0.95,
desired_kl=0.01,
max_grad_norm=1.0,
)
# Reference: https://github.com/DLR-RM/rl-baselines3-zoo/blob/master/hyperparams/ppo.yml#L161
seed: 42
n_timesteps: !!float 2e6
n_timesteps: !!float 1e7
policy: 'MlpPolicy'
batch_size: 128
n_steps: 512
......@@ -16,8 +16,8 @@ learning_rate: !!float 3e-5
use_sde: True
clip_range: 0.4
policy_kwargs: "dict(
log_std_init=-2,
log_std_init=-1,
ortho_init=False,
activation_fn=nn.ReLU,
net_arch=[dict(pi=[256, 256], vf=[256, 256])]
net_arch=dict(pi=[256, 256], vf=[256, 256])
)"
# parameters for the MDP
env:
# general
num_envs: 1024
env_spacing: 5
episode_length: 1000
# step parameters
control_frequency_inv: 2 # 60 Hz
# actions parameters
power_scale: 0.5
# observations parameters
angular_velocity_scale: 1.0
dof_velocity_scale: 0.2
contact_force_scale: 0.1
# reward parameters
heading_weight: 0.5
up_weight: 0.1
alive_weight: 0.5
# cost parameters
actions_cost: 0.005
energy_cost: 0.05
joints_at_limit_cost: 0.1
# termination parameters
death_cost: -2.0
termination_height: 0.31
# parameters for setting up the scene
scene:
ant:
# articulation
solver_position_iteration_count: 4
solver_velocity_iteration_count: 0
sleep_threshold: 0.005
stabilization_threshold: 0.001
enable_self_collisions: True
# per-body
enable_gyroscopic_forces: True
max_depenetration_velocity: 10.0
# per-shape
contact_offset: 0.02
rest_offset: 0.0
# parameters for physics engine
sim:
dt: 0.0083 # 1/120 s
substeps: 1
gravity: [0.0, 0.0, -9.81]
enable_scene_query_support: False
use_gpu_pipeline: True
use_flatcache: True # deprecated from Isaac Sim 2023.1 onwards
use_fabric: True # used from Isaac Sim 2023.1 onwards
device: "cuda:0"
physx:
solver_type: 1
use_gpu: True # set to False to run on CPU
bounce_threshold_velocity: 0.2
friction_offset_threshold: 0.04
friction_correlation_distance: 0.025
enable_stabilization: True
# GPU buffers
gpu_max_rigid_contact_count: 524288
gpu_max_rigid_patch_count: 81920
gpu_found_lost_pairs_capacity: 1024
gpu_found_lost_aggregate_pairs_capacity: 262144
gpu_total_aggregate_pairs_capacity: 2048
gpu_max_soft_body_contacts: 1048576
gpu_max_particle_contacts: 1048576
gpu_heap_capacity: 67108864
gpu_temp_buffer_capacity: 16777216
gpu_max_num_partitions: 8
# Copyright (c) 2022-2023, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import gymnasium as gym
import math
import torch
import omni.isaac.core.utils.nucleus as nucleus_utils
import omni.isaac.core.utils.prims as prim_utils
import omni.isaac.core.utils.torch as torch_utils
from omni.isaac.core.articulations import ArticulationView
import omni.isaac.orbit.compat.utils.kit as kit_utils
from omni.isaac.orbit_tasks.isaac_env import IsaacEnv, VecEnvIndices, VecEnvObs
from omni.isaac.orbit_tasks.isaac_env_cfg import EnvCfg, IsaacEnvCfg
class AntEnv(IsaacEnv):
"""Environment for an Ant with four legs on a flat terrain.
Reference:
https://github.com/openai/gym/blob/master/gym/envs/mujoco/ant_v3.py
"""
def __init__(self, cfg: dict, **kwargs):
"""Initializes the environment.
Args:
cfg: The configuration dictionary.
kwargs: Additional keyword arguments. See IsaacEnv for more details.
"""
# copy configuration
self.cfg_dict = cfg.copy()
# configuration for the environment
isaac_cfg = IsaacEnvCfg(
env=EnvCfg(num_envs=self.cfg_dict["env"]["num_envs"], env_spacing=self.cfg_dict["env"]["env_spacing"])
)
isaac_cfg.sim.from_dict(self.cfg_dict["sim"])
# initialize the base class to setup the scene.
super().__init__(isaac_cfg, **kwargs)
# define views over instances
self.ants = ArticulationView(prim_paths_expr=self.env_ns + "/.*/Ant/torso", reset_xform_properties=False)
# play the simulator to activate physics handles
# note: this activates the physics simulation view that exposes TensorAPIs
self.sim.reset()
# initialize all the handles
self.ants.initialize(self.sim.physics_sim_view)
# set the default state
self.ants.post_reset()
# get quantities from scene we care about
self._dof_limits = self.ants.get_dof_limits()[0, :].to(self.device)
self._initial_root_tf = self.ants.get_world_poses(clone=True)
self._initial_dof_pos = self.ants.get_joint_positions(clone=True)
# initialize buffers
self.actions = torch.zeros((self.num_envs, 8), device=self.device)
# create constants required later during simulation.
self._define_environment_constants()
# create other useful variables
self.potentials = torch.full(
(self.num_envs,), -1000.0 / self.physics_dt, dtype=torch.float32, device=self.device
)
self.prev_potentials = self.potentials.clone()
# compute the observation space
self.observation_space = gym.spaces.Box(low=-math.inf, high=math.inf, shape=(60,))
# compute the action space
self.action_space = gym.spaces.Box(low=-1.0, high=1.0, shape=(8,))
# store maximum episode length
self.max_episode_length = self.cfg_dict["env"]["episode_length"]
"""
Implementation specifics.
"""
def _design_scene(self) -> list[str]:
# get nucleus assets path
assets_root_path = nucleus_utils.get_assets_root_path()
if assets_root_path is None:
raise RuntimeError(
"Unable to access the Nucleus server from Omniverse. For more information, please check:"
" https://docs.omniverse.nvidia.com/app_isaacsim/app_isaacsim/overview.html#omniverse-nucleus"
)
# ground plane
kit_utils.create_ground_plane(
"/World/defaultGroundPlane", static_friction=0.5, dynamic_friction=0.5, restitution=0.8
)
# robot
robot_usd_path = assets_root_path + "/Isaac/Robots/Ant/ant_instanceable.usd"
prim_utils.create_prim(
prim_path=self.template_env_ns + "/Ant", usd_path=robot_usd_path, translation=(0.0, 0.0, 0.5)
)
# apply articulation settings
kit_utils.set_articulation_properties(
prim_path=self.template_env_ns + "/Ant/torso",
solver_position_iteration_count=self.cfg_dict["scene"]["ant"]["solver_position_iteration_count"],
solver_velocity_iteration_count=self.cfg_dict["scene"]["ant"]["solver_velocity_iteration_count"],
sleep_threshold=self.cfg_dict["scene"]["ant"]["sleep_threshold"],
stabilization_threshold=self.cfg_dict["scene"]["ant"]["stabilization_threshold"],
enable_self_collisions=self.cfg_dict["scene"]["ant"]["enable_self_collisions"],
)
# apply rigid body settings
kit_utils.set_nested_rigid_body_properties(
prim_path=self.template_env_ns + "/Ant",
enable_gyroscopic_forces=self.cfg_dict["scene"]["ant"]["enable_gyroscopic_forces"],
max_depenetration_velocity=self.cfg_dict["scene"]["ant"]["max_depenetration_velocity"],
)
# apply collider properties
kit_utils.set_nested_collision_properties(
prim_path=self.template_env_ns + "/Ant",
contact_offset=self.cfg_dict["scene"]["ant"]["contact_offset"],
rest_offset=self.cfg_dict["scene"]["ant"]["rest_offset"],
)
# return global prims
return ["/World/defaultGroundPlane"]
def _reset_idx(self, env_ids: VecEnvIndices):
# get num envs to reset
num_resets = len(env_ids)
# randomize the MDP
# -- DOF position
dof_pos = torch_utils.torch_rand_float(-0.2, 0.2, (num_resets, self.ants.num_dof), device=self.device)
dof_pos[:] = torch_utils.tensor_clamp(
self._initial_dof_pos[env_ids] + dof_pos, self._dof_limits[:, 0], self._dof_limits[:, 1]
)
self.ants.set_joint_positions(dof_pos, indices=env_ids)
# -- DOF velocity
dof_vel = torch_utils.torch_rand_float(-0.1, 0.1, (num_resets, self.ants.num_dof), device=self.device)
self.ants.set_joint_velocities(dof_vel, indices=env_ids)
# -- Root pose
root_pos, root_rot = self._initial_root_tf[0].clone()[env_ids], self._initial_root_tf[1].clone()[env_ids]
self.ants.set_world_poses(root_pos, root_rot, indices=env_ids)
# -- Root velocity
root_vel = torch.zeros((num_resets, 6), device=self.device)
self.ants.set_velocities(root_vel, indices=env_ids)
# -- Reset potentials
to_target = self._GOAL_POS[env_ids] - root_pos
to_target[:, 2] = 0.0
self.prev_potentials[env_ids] = -torch.norm(to_target, p=2, dim=-1) / self.physics_dt
self.potentials[env_ids] = self.prev_potentials[env_ids].clone()
# -- MDP reset
self.actions[env_ids, :] = 0
self.reset_buf[env_ids] = 0
self.episode_length_buf[env_ids] = 0
def _step_impl(self, actions: torch.Tensor):
# reset environments that terminated
reset_env_ids = self.reset_buf.nonzero(as_tuple=False).squeeze(-1)
if len(reset_env_ids) > 0:
self._reset_idx(reset_env_ids)
# increment the number of steps
self.episode_length_buf += 1
# store actions into local
self.actions = actions.clone().to(device=self.device)
# pre-step: set actions into buffer
dof_forces = self.actions * self._JOINT_GEARS * self.cfg_dict["env"]["power_scale"]
indices = torch.arange(self.num_envs, dtype=torch.int32, device=self.device)
self.ants.set_joint_efforts(dof_forces, indices=indices)
# perform physics stepping
for _ in range(self.cfg_dict["env"]["control_frequency_inv"]):
# step simulation
self.sim.step(render=self.enable_render)
# check if simulation is still running
if self.sim.is_stopped():
return
# post-step: compute MDP
self._cache_common_quantities()
self._compute_rewards()
self._check_termination()
# add information to extra if timeout occurred due to episode length
# Note: this is used by algorithms like PPO where time-outs are handled differently
# For more info: https://github.com/DLR-RM/stable-baselines3/issues/633
self.extras["time_outs"] = self.episode_length_buf >= self.cfg_dict["env"]["episode_length"]
def _get_observations(self) -> VecEnvObs:
# extract euler angles (in start frame)
roll, _, yaw = torch_utils.get_euler_xyz(self._torso_quat_start)
# compute heading direction
# TODO: Check why is this using z direction instead of y.
walk_target_angle = torch.atan2(
self._GOAL_POS[:, 2] - self._torso_pos_start[:, 2], self._GOAL_POS[:, 0] - self._torso_pos_start[:, 0]
)
angle_to_target = walk_target_angle - yaw
# obs_buf shapes: 1, 3, 3, 1, 1, 1, 1, 1, num_dofs(8), num_dofs(8), num_dofs(8)
obs_buf = torch.cat(
(
self._torso_pos_start[:, 2].view(-1, 1),
self._lin_vel_start,
self._ang_vel_start * self.cfg_dict["env"]["angular_velocity_scale"],
yaw.unsqueeze(-1),
roll.unsqueeze(-1),
angle_to_target.unsqueeze(-1),
self._up_proj.unsqueeze(-1),
self._heading_proj.unsqueeze(-1),
self._dof_pos_scaled,
self._dof_vel_scaled,
self._feet_force_torques * self.cfg_dict["env"]["contact_force_scale"],
self.actions,
),
dim=-1,
)
return {"policy": obs_buf}
"""
Helper functions - MDP.
"""
def _cache_common_quantities(self) -> None:
"""Compute common quantities from simulator used for computing MDP signals."""
# extract data from simulator
torso_pos_world, torso_quat_world = self.ants.get_world_poses(clone=False)
lin_vel_world = self.ants.get_linear_velocities(clone=False)
ang_vel_world = self.ants.get_angular_velocities(clone=False)
dof_pos = self.ants.get_joint_positions(clone=False)
dof_vel = self.ants.get_joint_velocities(clone=False)
# TODO: Remove direct usage of `_physics_view` when method exists in :class:`ArticulationView`
feet_force_torques = self.ants._physics_view.get_force_sensor_forces()
# scale the dof
self._dof_pos_scaled = torch_utils.scale_transform(dof_pos, self._dof_limits[:, 0], self._dof_limits[:, 1])
self._dof_vel_scaled = dof_vel * self.cfg_dict["env"]["dof_velocity_scale"]
# feet contact forces
self._feet_force_torques = feet_force_torques.reshape(self.num_envs, -1)
# convert base pose w.r.t. start frame
self._torso_pos_start = torso_pos_world
self._torso_quat_start = torch_utils.quat_mul(torso_quat_world, self._INV_START_QUAT)
# convert velocity (in base frame w.r.t. start frame)
self._lin_vel_start = torch_utils.quat_rotate_inverse(self._torso_quat_start, lin_vel_world)
self._ang_vel_start = torch_utils.quat_rotate_inverse(self._torso_quat_start, ang_vel_world)
# convert basis vectors w.r.t. start frame
up_vec = torch_utils.quat_rotate(self._torso_quat_start, self._UP_VEC)
heading_vec = torch_utils.quat_rotate(self._torso_quat_start, self._HEADING_VEC)
# compute relative movement to the target
self._to_target = self._GOAL_POS - self._torso_pos_start
self._to_target[:, 2] = 0.0
to_target_dir = torch_utils.normalize(self._to_target)
# compute projection of current heading to desired heading vector
self._up_proj = up_vec[:, 2]
self._heading_proj = torch.bmm(heading_vec.view(self.num_envs, 1, 3), to_target_dir.view(self.num_envs, 3, 1))
self._heading_proj = self._heading_proj.view(self.num_envs)
def _compute_rewards(self) -> None:
# heading in the right direction
heading_reward = torch.where(
self._heading_proj > 0.8,
self.cfg_dict["env"]["heading_weight"],
self.cfg_dict["env"]["heading_weight"] * self._heading_proj.double() / 0.8,
)
# aligning up axis of robot and environment
up_reward = torch.where(self._up_proj > 0.93, self.cfg_dict["env"]["up_weight"], 0.0)
# penalty for high action commands
actions_cost = torch.sum(self.actions**2, dim=-1)
# energy penalty for movement (power = torque * vel)
electricity_cost = torch.sum(torch.abs(self.actions * self._dof_vel_scaled), dim=-1)
# reaching close to dof limits
# TODO: Shouldn't this be absolute dof pos? Only checks for upper limit right now.
dof_at_limit_cost = torch.sum(self._dof_pos_scaled > 0.99, dim=-1)
# reward for duration of staying alive
alive_reward = self.cfg_dict["env"]["alive_weight"]
# compute x,y-potential score towards the goal
self.prev_potentials = self.potentials.clone()
self.potentials = -torch.norm(self._to_target, p=2, dim=-1) / self.physics_dt
# reward for progressing towards the goal (through L2 potential)
progress_reward = self.potentials - self.prev_potentials
# compute reward
total_reward = (
progress_reward
+ alive_reward
+ up_reward
+ heading_reward
- self.cfg_dict["env"]["actions_cost"] * actions_cost
- self.cfg_dict["env"]["energy_cost"] * electricity_cost
- self.cfg_dict["env"]["joints_at_limit_cost"] * dof_at_limit_cost
)
# adjust reward for fallen agents
total_reward = torch.where(
self._torso_pos_start[:, 2] < self.cfg_dict["env"]["termination_height"],
self.cfg_dict["env"]["death_cost"],
total_reward,
)
# set reward into buffer
self.reward_buf[:] = total_reward
def _check_termination(self) -> None:
# compute resets
# -- base has collapsed
resets = torch.where(
self._torso_pos_start[:, 2] < self.cfg_dict["env"]["termination_height"], 1, self.reset_buf
)
# -- episode length
resets = torch.where(self.episode_length_buf >= self.max_episode_length, 1, resets)
# set reset into buffer
self.reset_buf[:] = resets
def _define_environment_constants(self):
"""Defines useful constants used by the implementation."""
# desired goal position
self._GOAL_POS = torch.tensor([1000, 0, 0], dtype=torch.float32, device=self.device).repeat((self.num_envs, 1))
# gear ratio for joint control
self._JOINT_GEARS = torch.tensor([15] * self.ants.num_dof, dtype=torch.float32, device=self.device)
# initial spawn orientation
self._START_QUAT = torch.tensor([1, 0, 0, 0], device=self.device, dtype=torch.float32)
self._INV_START_QUAT = torch_utils.quat_conjugate(self._START_QUAT).repeat((self.num_envs, 1))
# heading direction for the robot
self._HEADING_VEC = torch.tensor([1, 0, 0], dtype=torch.float32, device=self.device).repeat((self.num_envs, 1))
# up direction for the simulator
self._UP_VEC = torch.tensor([0, 0, 1], dtype=torch.float32, device=self.device).repeat((self.num_envs, 1))
# Copyright (c) 2022-2023, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import omni.isaac.orbit.sim as sim_utils
from omni.isaac.orbit.actuators import ImplicitActuatorCfg
from omni.isaac.orbit.assets import ArticulationCfg, AssetBaseCfg
from omni.isaac.orbit.command_generators import NullCommandGeneratorCfg
from omni.isaac.orbit.envs import RLTaskEnvCfg
from omni.isaac.orbit.managers import ObservationGroupCfg as ObsGroup
from omni.isaac.orbit.managers import ObservationTermCfg as ObsTerm
from omni.isaac.orbit.managers import RandomizationTermCfg as RandTerm
from omni.isaac.orbit.managers import RewardTermCfg as RewTerm
from omni.isaac.orbit.managers import SceneEntityCfg
from omni.isaac.orbit.managers import TerminationTermCfg as DoneTerm
from omni.isaac.orbit.scene import InteractiveSceneCfg
from omni.isaac.orbit.terrains import TerrainImporterCfg
from omni.isaac.orbit.utils import configclass
from omni.isaac.orbit.utils.assets import ISAAC_NUCLEUS_DIR
import omni.isaac.orbit_tasks.classic.humanoid.mdp as mdp
##
# Scene definition
##
@configclass
class MySceneCfg(InteractiveSceneCfg):
"""Configuration for the terrain scene with an ant robot."""
# terrain
terrain = TerrainImporterCfg(
prim_path="/World/ground",
terrain_type="plane",
collision_group=-1,
physics_material=sim_utils.RigidBodyMaterialCfg(
friction_combine_mode="average",
restitution_combine_mode="average",
static_friction=1.0,
dynamic_friction=1.0,
restitution=0.0,
),
debug_vis=False,
)
# robot
robot = ArticulationCfg(
prim_path="{ENV_REGEX_NS}/Robot",
spawn=sim_utils.UsdFileCfg(
usd_path=f"{ISAAC_NUCLEUS_DIR}/Robots/Ant/ant_instanceable.usd",
rigid_props=sim_utils.RigidBodyPropertiesCfg(
disable_gravity=False,
max_depenetration_velocity=10.0,
enable_gyroscopic_forces=True,
),
articulation_props=sim_utils.ArticulationRootPropertiesCfg(
enabled_self_collisions=False,
solver_position_iteration_count=4,
solver_velocity_iteration_count=0,
sleep_threshold=0.005,
stabilization_threshold=0.001,
),
copy_from_source=False,
),
init_state=ArticulationCfg.InitialStateCfg(
pos=(0.0, 0.0, 0.5),
joint_pos={".*": 0.0},
),
actuators={
"body": ImplicitActuatorCfg(
joint_names_expr=[".*"],
stiffness=0.0,
damping=0.0,
),
},
)
# lights
light = AssetBaseCfg(
prim_path="/World/light",
spawn=sim_utils.DistantLightCfg(color=(0.75, 0.75, 0.75), intensity=3000.0),
)
@configclass
class ActionsCfg:
"""Action specifications for the MDP."""
joint_effort = mdp.JointEffortActionCfg(asset_name="robot", joint_names=[".*"], scale=7.5)
@configclass
class ObservationsCfg:
"""Observation specifications for the MDP."""
@configclass
class PolicyCfg(ObsGroup):
"""Observations for the policy."""
base_height = ObsTerm(func=mdp.base_pos_z)
base_lin_vel = ObsTerm(func=mdp.base_lin_vel)
base_ang_vel = ObsTerm(func=mdp.base_ang_vel)
base_yaw_roll = ObsTerm(func=mdp.base_yaw_roll)
base_angle_to_target = ObsTerm(func=mdp.base_angle_to_target, params={"target_pos": (1000.0, 0.0, 0.0)})
base_up_proj = ObsTerm(func=mdp.base_up_proj)
base_heading_proj = ObsTerm(func=mdp.base_heading_proj, params={"target_pos": (1000.0, 0.0, 0.0)})
joint_pos_norm = ObsTerm(func=mdp.joint_pos_norm)
joint_vel_rel = ObsTerm(func=mdp.joint_vel_rel, scale=0.2)
feet_body_forces = ObsTerm(
func=mdp.body_incoming_wrench,
scale=0.1,
params={
"asset_cfg": SceneEntityCfg(
"robot", body_names=["front_left_foot", "front_right_foot", "left_back_foot", "right_back_foot"]
)
},
)
actions = ObsTerm(func=mdp.last_action)
def __post_init__(self):
self.enable_corruption = False
self.concatenate_terms = True
# observation groups
policy: PolicyCfg = PolicyCfg()
@configclass
class RandomizationCfg:
"""Configuration for randomization."""
reset_base = RandTerm(
func=mdp.reset_root_state_uniform,
mode="reset",
params={"pose_range": {}, "velocity_range": {}},
)
reset_robot_joints = RandTerm(
func=mdp.reset_joints_by_offset,
mode="reset",
params={
"position_range": (-0.2, 0.2),
"velocity_range": (-0.1, 0.1),
},
)
@configclass
class RewardsCfg:
"""Reward terms for the MDP."""
# (1) Reward for moving forward
progress = RewTerm(func=mdp.progress_reward, weight=1.0, params={"target_pos": (1000.0, 0.0, 0.0)})
# (2) Stay alive bonus
alive = RewTerm(func=mdp.is_alive, weight=0.5)
# (3) Reward for non-upright posture
upright = RewTerm(func=mdp.upright_posture_bonus, weight=0.1, params={"threshold": 0.93})
# (4) Reward for moving in the right direction
move_to_target = RewTerm(
func=mdp.move_to_target_bonus, weight=0.5, params={"threshold": 0.8, "target_pos": (1000.0, 0.0, 0.0)}
)
# (5) Penalty for large action commands
action_l2 = RewTerm(func=mdp.action_l2, weight=-0.005)
# (6) Penalty for energy consumption
energy = RewTerm(func=mdp.power_consumption, weight=-0.05, params={"gear_ratio": {".*": 15.0}})
# (7) Penalty for reaching close to joint limits
joint_limits = RewTerm(
func=mdp.joint_limits_penalty_ratio, weight=-0.1, params={"threshold": 0.99, "gear_ratio": {".*": 15.0}}
)
@configclass
class TerminationsCfg:
"""Termination terms for the MDP."""
# (1) Terminate if the episode length is exceeded
time_out = DoneTerm(func=mdp.time_out, time_out=True)
# (2) Terminate if the robot falls
torso_height = DoneTerm(func=mdp.base_height, params={"minimum_height": 0.31})
@configclass
class CurriculumCfg:
"""Curriculum terms for the MDP."""
pass
@configclass
class AntEnvCfg(RLTaskEnvCfg):
"""Configuration for the MuJoCo-style Ant walking environment."""
# Scene settings
scene: MySceneCfg = MySceneCfg(num_envs=4096, env_spacing=5.0)
# Basic settings
observations: ObservationsCfg = ObservationsCfg()
actions: ActionsCfg = ActionsCfg()
commands: NullCommandGeneratorCfg = NullCommandGeneratorCfg()
# MDP settings
rewards: RewardsCfg = RewardsCfg()
terminations: TerminationsCfg = TerminationsCfg()
randomization: RandomizationCfg = RandomizationCfg()
curriculum: CurriculumCfg = CurriculumCfg()
def __post_init__(self):
"""Post initialization."""
# general settings
self.decimation = 2
self.episode_length_s = 16.0
# simulation settings
self.sim.dt = 1 / 120.0
self.sim.physx.bounce_threshold_velocity = 0.2
# default friction material
self.sim.physics_material.static_friction = 1.0
self.sim.physics_material.dynamic_friction = 1.0
self.sim.physics_material.restitution = 0.0
......@@ -148,9 +148,9 @@ class RewardsCfg:
"""Reward terms for the MDP."""
# (1) Constant running reward
alive = RewTerm(func=mdp.alive_bonus, weight=1.0)
alive = RewTerm(func=mdp.is_alive, weight=1.0)
# (2) Failure penalty
terminating = RewTerm(func=mdp.termination_penalty, weight=-2.0)
terminating = RewTerm(func=mdp.is_terminated, weight=-2.0)
# (3) Primary task: keep pole upright
pole_pos = RewTerm(
func=mdp.joint_pos_target_l2,
......
......@@ -7,21 +7,23 @@
Humanoid locomotion environment (similar to OpenAI Gym Humanoid-v2).
"""
# import gymnasium as gym
import gymnasium as gym
# from . import agents
from . import agents, humanoid_env_cfg
##
# Register Gym environments.
##
# gym.register(
# id="Isaac-Humanoid-v0",
# entry_point="omni.isaac.orbit.envs:RLTaskEnv",
# kwargs={
# "env_cfg_entry_point": f"{__name__}:ant_env_cfg.yaml",
# "rl_games_cfg_entry_point": f"{agents.__name__}:rl_games_ppo_cfg.yaml",
# "skrl_cfg_entry_point": f"{agents.__name__}:skrl_ppo_cfg.yaml",
# "sb3_cfg_entry_point": f"{agents.__name__}:sb3_ppo_cfg.yaml",
# },
# )
gym.register(
id="Isaac-Humanoid-v0",
entry_point="omni.isaac.orbit.envs:RLTaskEnv",
disable_env_checker=True,
kwargs={
"env_cfg_entry_point": humanoid_env_cfg.HumanoidEnvCfg,
"rsl_rl_cfg_entry_point": agents.rsl_rl_ppo_cfg.HumanoidPPORunnerCfg,
"rl_games_cfg_entry_point": f"{agents.__name__}:rl_games_ppo_cfg.yaml",
"skrl_cfg_entry_point": f"{agents.__name__}:skrl_ppo_cfg.yaml",
"sb3_cfg_entry_point": f"{agents.__name__}:sb3_ppo_cfg.yaml",
},
)
# Copyright (c) 2022-2023, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from . import rsl_rl_ppo_cfg # noqa: F401, F403
......@@ -45,32 +45,31 @@ params:
device_name: 'cuda:0'
multi_gpu: False
ppo: True
mixed_precision: False
mixed_precision: True
normalize_input: True
normalize_value: True
value_bootstrap: False
value_bootstrap: True
num_actors: -1
reward_shaper:
scale_value: 0.01
scale_value: 0.6
normalize_advantage: True
gamma: 0.99
tau: 0.95
learning_rate: 3e-4
learning_rate: 5e-4
lr_schedule: adaptive
schedule_type: legacy
kl_threshold: 0.008
kl_threshold: 0.01
score_to_win: 20000
max_epochs: 500
save_best_after: 500
max_epochs: 1000
save_best_after: 100
save_frequency: 100
grad_norm: 1.0
entropy_coef: 0.0
truncate_grads: True
e_clip: 0.2
horizon_length: 32
minibatch_size: 4096 # 32768
mini_epochs: 8
critic_coef: 2
minibatch_size: 32768
mini_epochs: 5
critic_coef: 4
clip_value: True
seq_len: 4
seq_length: 4
bounds_loss_coef: 0.0001
# Copyright (c) 2022-2023, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from omni.isaac.orbit.utils import configclass
from omni.isaac.orbit_tasks.utils.wrappers.rsl_rl import (
RslRlOnPolicyRunnerCfg,
RslRlPpoActorCriticCfg,
RslRlPpoAlgorithmCfg,
)
@configclass
class HumanoidPPORunnerCfg(RslRlOnPolicyRunnerCfg):
num_steps_per_env = 32
max_iterations = 1000
save_interval = 50
experiment_name = "humanoid"
empirical_normalization = False
policy = RslRlPpoActorCriticCfg(
init_noise_std=1.0,
actor_hidden_dims=[400, 200, 100],
critic_hidden_dims=[400, 200, 100],
activation="elu",
)
algorithm = RslRlPpoAlgorithmCfg(
value_loss_coef=1.0,
use_clipped_value_loss=True,
clip_param=0.2,
entropy_coef=0.0,
num_learning_epochs=5,
num_mini_batches=4,
learning_rate=5.0e-4,
schedule="adaptive",
gamma=0.99,
lam=0.95,
desired_kl=0.01,
max_grad_norm=1.0,
)
# Reference: https://github.com/DLR-RM/rl-baselines3-zoo/blob/master/hyperparams/ppo.yml#L457
# Reference: https://github.com/DLR-RM/rl-baselines3-zoo/blob/master/hyperparams/ppo.yml#L245
seed: 42
policy: 'MlpPolicy'
n_timesteps: !!float 1e7
n_timesteps: !!float 5e7
batch_size: 256
n_steps: 512
gamma: 0.95
learning_rate: 3.56987e-05
ent_coef: 0.00238306
clip_range: 0.3
n_epochs: 5
gae_lambda: 0.9
max_grad_norm: 2
vf_coef: 0.431892
gamma: 0.99
learning_rate: !!float 2.5e-4
ent_coef: 0.0
clip_range: 0.2
n_epochs: 10
gae_lambda: 0.95
max_grad_norm: 1.0
vf_coef: 0.5
policy_kwargs: "dict(
log_std_init=-2,
ortho_init=False,
activation_fn=nn.ReLU,
net_arch=[dict(pi=[256, 256], vf=[256, 256])]
net_arch=dict(pi=[256, 256], vf=[256, 256])
)"
# parameters for the MDP
env:
# general
num_envs: 1024
env_spacing: 5
episode_length: 1000
# step parameters
control_frequency_inv: 2 # 60 Hz
# actions parameters
power_scale: 1.0
# observations parameters
angular_velocity_scale: 0.25
dof_velocity_scale: 0.1
contact_force_scale: 0.01
# reward parameters
heading_weight: 0.5
up_weight: 0.1
alive_weight: 2.0
# cost parameters
actions_cost: 0.01
energy_cost: 0.05
joints_at_limit_cost: 0.25
# termination parameters
death_cost: -1.0
termination_height: 0.8
# parameters for setting up the scene
scene:
humanoid:
# articulation
solver_position_iteration_count: 4
solver_velocity_iteration_count: 0
sleep_threshold: 0.005
stabilization_threshold: 0.001
enable_self_collisions: True
# per-body
enable_gyroscopic_forces: True
max_depenetration_velocity: 10.0
# per-shape
contact_offset: 0.02
rest_offset: 0.0
# parameters for physics engine
sim:
dt: 0.0083 # 1/120 s
substeps: 1
gravity: [0.0, 0.0, -9.81]
enable_scene_query_support: False
use_gpu_pipeline: True
use_flatcache: True # deprecated from Isaac Sim 2023.1 onwards
use_fabric: True # used from Isaac Sim 2023.1 onwards
device: "cuda:0"
physx:
solver_type: 1
use_gpu: True # set to False to run on CPU
bounce_threshold_velocity: 0.2
friction_offset_threshold: 0.04
friction_correlation_distance: 0.025
enable_stabilization: True
# GPU buffers
gpu_max_rigid_contact_count: 524288
gpu_max_rigid_patch_count: 81920
gpu_found_lost_pairs_capacity: 1024
gpu_found_lost_aggregate_pairs_capacity: 262144
gpu_total_aggregate_pairs_capacity: 2048
gpu_max_soft_body_contacts: 1048576
gpu_max_particle_contacts: 1048576
gpu_heap_capacity: 67108864
gpu_temp_buffer_capacity: 16777216
gpu_max_num_partitions: 8
# Copyright (c) 2022-2023, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import gymnasium as gym
import math
import torch
import omni.isaac.core.utils.nucleus as nucleus_utils
import omni.isaac.core.utils.prims as prim_utils
import omni.isaac.core.utils.torch as torch_utils
from omni.isaac.core.articulations import ArticulationView
import omni.isaac.orbit.compat.utils.kit as kit_utils
from omni.isaac.orbit_tasks.isaac_env import IsaacEnv, VecEnvIndices, VecEnvObs
from omni.isaac.orbit_tasks.isaac_env_cfg import EnvCfg, IsaacEnvCfg
class HumanoidEnv(IsaacEnv):
"""Environment for an Humanoid on a flat terrain.
Reference:
https://github.com/openai/gym/blob/master/gym/envs/mujoco/humanoid_v3.py
"""
def __init__(self, cfg: dict, **kwargs):
"""Initializes the environment.
Args:
cfg: The configuration dictionary.
kwargs: Additional keyword arguments. See IsaacEnv for more details.
"""
# copy configuration
self.cfg_dict = cfg.copy()
# configuration for the environment
isaac_cfg = IsaacEnvCfg(
env=EnvCfg(num_envs=self.cfg_dict["env"]["num_envs"], env_spacing=self.cfg_dict["env"]["env_spacing"])
)
isaac_cfg.sim.from_dict(self.cfg_dict["sim"])
# initialize the base class to setup the scene.
super().__init__(isaac_cfg, **kwargs)
# define views over instances
self.humanoids = ArticulationView(
prim_paths_expr=self.env_ns + "/.*/Humanoid/torso", reset_xform_properties=False
)
# play the simulator to activate physics handles
# note: this activates the physics simulation view that exposes TensorAPIs
self.sim.reset()
# initialize all the handles
self.humanoids.initialize(self.sim.physics_sim_view)
# set the default state
self.humanoids.post_reset()
# get quantities from scene we care about
self._dof_limits = self.humanoids.get_dof_limits()[0, :].to(self.device)
self._initial_root_tf = self.humanoids.get_world_poses(clone=True)
self._initial_dof_pos = self.humanoids.get_joint_positions(clone=True)
# initialize buffers
self.actions = torch.zeros((self.num_envs, 21), device=self.device)
# create constants required later during simulation.
self._define_environment_constants()
# create other useful variables
self.potentials = torch.full(
(self.num_envs,), -1000.0 / self.physics_dt, dtype=torch.float32, device=self.device
)
self.prev_potentials = self.potentials.clone()
# compute the observation space
self.observation_space = gym.spaces.Box(low=-math.inf, high=math.inf, shape=(87,))
# compute the action space
self.action_space = gym.spaces.Box(low=-1.0, high=1.0, shape=(21,))
# store maximum episode length
self.max_episode_length = self.cfg_dict["env"]["episode_length"]
"""
Implementation specifics.
"""
def _design_scene(self) -> list[str]:
# get nucleus assets path
assets_root_path = nucleus_utils.get_assets_root_path()
if assets_root_path is None:
raise RuntimeError(
"Unable to access the Nucleus server from Omniverse. For more information, please check:"
" https://docs.omniverse.nvidia.com/app_isaacsim/app_isaacsim/overview.html#omniverse-nucleus"
)
# ground plane
kit_utils.create_ground_plane(
"/World/defaultGroundPlane", static_friction=0.5, dynamic_friction=0.5, restitution=0.8
)
# robot
robot_usd_path = assets_root_path + "/Isaac/Robots/Humanoid/humanoid_instanceable.usd"
prim_utils.create_prim(
prim_path=self.template_env_ns + "/Humanoid", usd_path=robot_usd_path, translation=(0.0, 0.0, 1.34)
)
# apply articulation settings
kit_utils.set_articulation_properties(
prim_path=self.template_env_ns + "/Humanoid/torso",
solver_position_iteration_count=self.cfg_dict["scene"]["humanoid"]["solver_position_iteration_count"],
solver_velocity_iteration_count=self.cfg_dict["scene"]["humanoid"]["solver_velocity_iteration_count"],
sleep_threshold=self.cfg_dict["scene"]["humanoid"]["sleep_threshold"],
stabilization_threshold=self.cfg_dict["scene"]["humanoid"]["stabilization_threshold"],
enable_self_collisions=self.cfg_dict["scene"]["humanoid"]["enable_self_collisions"],
)
# apply rigid body settings
kit_utils.set_nested_rigid_body_properties(
prim_path=self.template_env_ns + "/Humanoid",
enable_gyroscopic_forces=self.cfg_dict["scene"]["humanoid"]["enable_gyroscopic_forces"],
max_depenetration_velocity=self.cfg_dict["scene"]["humanoid"]["max_depenetration_velocity"],
)
# apply collider properties
kit_utils.set_nested_collision_properties(
prim_path=self.template_env_ns + "/Humanoid",
contact_offset=self.cfg_dict["scene"]["humanoid"]["contact_offset"],
rest_offset=self.cfg_dict["scene"]["humanoid"]["rest_offset"],
)
# return global prims
return ["/World/defaultGroundPlane"]
def _reset_idx(self, env_ids: VecEnvIndices):
# get num envs to reset
num_resets = len(env_ids)
# randomize the MDP
# -- DOF position
dof_pos = torch_utils.torch_rand_float(-0.2, 0.2, (num_resets, self.humanoids.num_dof), device=self.device)
dof_pos[:] = torch_utils.tensor_clamp(
self._initial_dof_pos[env_ids] + dof_pos, self._dof_limits[:, 0], self._dof_limits[:, 1]
)
self.humanoids.set_joint_positions(dof_pos, indices=env_ids)
# -- DOF velocity
dof_vel = torch_utils.torch_rand_float(-0.1, 0.1, (num_resets, self.humanoids.num_dof), device=self.device)
self.humanoids.set_joint_velocities(dof_vel, indices=env_ids)
# -- Root pose
root_pos, root_rot = self._initial_root_tf[0].clone()[env_ids], self._initial_root_tf[1].clone()[env_ids]
self.humanoids.set_world_poses(root_pos, root_rot, indices=env_ids)
# -- Root velocity
root_vel = torch.zeros((num_resets, 6), device=self.device)
self.humanoids.set_velocities(root_vel, indices=env_ids)
# -- Reset potentials
to_target = self._GOAL_POS[env_ids] - root_pos
to_target[:, 2] = 0.0
self.prev_potentials[env_ids] = -torch.norm(to_target, p=2, dim=-1) / self.physics_dt
self.potentials[env_ids] = self.prev_potentials[env_ids].clone()
# -- MDP reset
self.actions[env_ids, :] = 0
self.reset_buf[env_ids] = 0
self.episode_length_buf[env_ids] = 0
def _step_impl(self, actions: torch.Tensor):
# reset environments that terminated
reset_env_ids = self.reset_buf.nonzero(as_tuple=False).squeeze(-1)
if len(reset_env_ids) > 0:
self._reset_idx(reset_env_ids)
# increment the number of steps
self.episode_length_buf += 1
# pre-step: set actions into buffer
self.actions = actions.clone().to(device=self.device)
dof_forces = self.actions * self._JOINT_GEARS * self.cfg_dict["env"]["power_scale"]
indices = torch.arange(self.num_envs, dtype=torch.int32, device=self.device)
self.humanoids.set_joint_efforts(dof_forces, indices=indices)
# perform physics stepping
for _ in range(self.cfg_dict["env"]["control_frequency_inv"]):
# step simulation
self.sim.step(render=self.enable_render)
# check if simulation is still running
if self.sim.is_stopped():
return
# post-step: compute MDP
self._cache_common_quantities()
self._compute_rewards()
self._check_termination()
# add information to extra if timeout occurred due to episode length
# Note: this is used by algorithms like PPO where time-outs are handled differently
# For more info: https://github.com/DLR-RM/stable-baselines3/issues/633
self.extras["time_outs"] = self.episode_length_buf >= self.cfg_dict["env"]["episode_length"]
def _get_observations(self) -> VecEnvObs:
# extract euler angles (in start frame)
roll, _, yaw = torch_utils.get_euler_xyz(self._torso_quat_start)
# compute heading direction
# TODO: Check why is this using z direction instead of y.
walk_target_angle = torch.atan2(
self._GOAL_POS[:, 2] - self._torso_pos_start[:, 2], self._GOAL_POS[:, 0] - self._torso_pos_start[:, 0]
)
angle_to_target = walk_target_angle - yaw
# obs_buf shapes: 1, 3, 3, 1, 1, 1, 1, 1, num_dofs(21), num_dofs(21), num_dofs(21)
obs_buf = torch.cat(
(
self._torso_pos_start[:, 2].view(-1, 1),
self._lin_vel_start,
self._ang_vel_start * self.cfg_dict["env"]["angular_velocity_scale"],
yaw.unsqueeze(-1),
roll.unsqueeze(-1),
angle_to_target.unsqueeze(-1),
self._up_proj.unsqueeze(-1),
self._heading_proj.unsqueeze(-1),
self._dof_pos_scaled,
self._dof_vel_scaled,
self._feet_force_torques * self.cfg_dict["env"]["contact_force_scale"],
self.actions,
),
dim=-1,
)
return {"policy": obs_buf}
"""
Helper functions - MDP.
"""
def _cache_common_quantities(self) -> None:
"""Compute common quantities from simulator used for computing MDP signals."""
# extract data from simulator
torso_pos_world, torso_quat_world = self.humanoids.get_world_poses(clone=False)
lin_vel_world = self.humanoids.get_linear_velocities(clone=False)
ang_vel_world = self.humanoids.get_angular_velocities(clone=False)
dof_pos = self.humanoids.get_joint_positions(clone=False)
dof_vel = self.humanoids.get_joint_velocities(clone=False)
# TODO: Remove direct usage of `_physics_view` when method exists in :class:`ArticulationView`
feet_force_torques = self.humanoids._physics_view.get_force_sensor_forces()
# scale the dof
self._dof_pos_scaled = torch_utils.scale_transform(dof_pos, self._dof_limits[:, 0], self._dof_limits[:, 1])
self._dof_vel_scaled = dof_vel * self.cfg_dict["env"]["dof_velocity_scale"]
# feet contact forces
self._feet_force_torques = feet_force_torques.reshape(self.num_envs, -1)
# convert base pose w.r.t. start frame
self._torso_pos_start = torso_pos_world
self._torso_quat_start = torch_utils.quat_mul(torso_quat_world, self._INV_START_QUAT)
# convert velocity (in base frame w.r.t. start frame)
self._lin_vel_start = torch_utils.quat_rotate_inverse(self._torso_quat_start, lin_vel_world)
self._ang_vel_start = torch_utils.quat_rotate_inverse(self._torso_quat_start, ang_vel_world)
# convert basis vectors w.r.t. start frame
up_vec = torch_utils.quat_rotate(self._torso_quat_start, self._UP_VEC)
heading_vec = torch_utils.quat_rotate(self._torso_quat_start, self._HEADING_VEC)
# compute relative movement to the target
self._to_target = self._GOAL_POS - self._torso_pos_start
self._to_target[:, 2] = 0.0
to_target_dir = torch_utils.normalize(self._to_target)
# compute projection of current heading to desired heading vector
self._up_proj = up_vec[:, 2]
self._heading_proj = torch.bmm(heading_vec.view(self.num_envs, 1, 3), to_target_dir.view(self.num_envs, 3, 1))
self._heading_proj = self._heading_proj.view(self.num_envs)
def _compute_rewards(self) -> None:
# heading in the right direction
heading_reward = torch.where(
self._heading_proj > 0.8,
self.cfg_dict["env"]["heading_weight"],
self.cfg_dict["env"]["heading_weight"] * self._heading_proj.double() / 0.8,
)
# aligning up axis of robot and environment
up_reward = torch.where(self._up_proj > 0.93, self.cfg_dict["env"]["up_weight"], 0.0)
# penalty for high action commands
actions_cost = torch.sum(self.actions**2, dim=-1)
# energy penalty for movement (power = torque * vel)
electricity_cost = torch.sum(torch.abs(self.actions * self._dof_vel_scaled), dim=-1)
# reaching close to dof limits
# TODO: Shouldn't this be absolute dof pos? Only checks for upper limit right now.
motor_effort_ratio = self._JOINT_GEARS / self._MAX_MOTOR_EFFORT
scaled_cost = (torch.abs(self._dof_pos_scaled) - 0.98) / 0.02
dof_at_limit_cost = torch.sum(
(torch.abs(self._dof_pos_scaled) > 0.98) * scaled_cost * motor_effort_ratio.unsqueeze(0), dim=-1
)
# reward for duration of staying alive
alive_reward = self.cfg_dict["env"]["alive_weight"]
# compute x,y-potential score towards the goal
self.prev_potentials = self.potentials.clone()
self.potentials = -torch.norm(self._to_target, p=2, dim=-1) / self.physics_dt
# reward for progressing towards the goal (through L2 potential)
progress_reward = self.potentials - self.prev_potentials
# compute reward
total_reward = (
progress_reward
+ alive_reward
+ up_reward
+ heading_reward
- self.cfg_dict["env"]["actions_cost"] * actions_cost
- self.cfg_dict["env"]["energy_cost"] * electricity_cost
- self.cfg_dict["env"]["joints_at_limit_cost"] * dof_at_limit_cost
)
# adjust reward for fallen agents
total_reward = torch.where(
self._torso_pos_start[:, 2] < self.cfg_dict["env"]["termination_height"],
self.cfg_dict["env"]["death_cost"],
total_reward,
)
# set reward into buffer
self.reward_buf[:] = total_reward
def _check_termination(self) -> None:
# compute resets
# -- base has collapsed
resets = torch.where(
self._torso_pos_start[:, 2] < self.cfg_dict["env"]["termination_height"], 1, self.reset_buf
)
# -- episode length
resets = torch.where(self.episode_length_buf >= self.max_episode_length, 1, resets)
# set reset into buffer
self.reset_buf[:] = resets
def _define_environment_constants(self):
"""Defines useful constants used by the implementation."""
# desired goal position
self._GOAL_POS = torch.tensor([1000, 0, 0], dtype=torch.float32, device=self.device).repeat((self.num_envs, 1))
# gear ratio for joint control
self._JOINT_GEARS = torch.tensor(
[
67.5000, # lower_waist
67.5000, # lower_waist
67.5000, # right_upper_arm
67.5000, # right_upper_arm
67.5000, # left_upper_arm
67.5000, # left_upper_arm
67.5000, # pelvis
45.0000, # right_lower_arm
45.0000, # left_lower_arm
45.0000, # right_thigh: x
135.0000, # right_thigh: y
45.0000, # right_thigh: z
45.0000, # left_thigh: x
135.0000, # left_thigh: y
45.0000, # left_thigh: z
90.0000, # right_knee
90.0000, # left_knee
22.5, # right_foot
22.5, # right_foot
22.5, # left_foot
22.5, # left_foot
],
dtype=torch.float32,
device=self.device,
)
# the maximum motor effort applicable
self._MAX_MOTOR_EFFORT = torch.max(self._JOINT_GEARS)
# initial spawn orientation
self._START_QUAT = torch.tensor([1, 0, 0, 0], device=self.device, dtype=torch.float32)
self._INV_START_QUAT = torch_utils.quat_conjugate(self._START_QUAT).repeat((self.num_envs, 1))
# heading direction for the robot
self._HEADING_VEC = torch.tensor([1, 0, 0], dtype=torch.float32, device=self.device).repeat((self.num_envs, 1))
# up direction for the simulator
self._UP_VEC = torch.tensor([0, 0, 1], dtype=torch.float32, device=self.device).repeat((self.num_envs, 1))
# Copyright (c) 2022-2023, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import omni.isaac.orbit.sim as sim_utils
from omni.isaac.orbit.actuators import ImplicitActuatorCfg
from omni.isaac.orbit.assets import ArticulationCfg, AssetBaseCfg
from omni.isaac.orbit.command_generators import NullCommandGeneratorCfg
from omni.isaac.orbit.envs import RLTaskEnvCfg
from omni.isaac.orbit.managers import ObservationGroupCfg as ObsGroup
from omni.isaac.orbit.managers import ObservationTermCfg as ObsTerm
from omni.isaac.orbit.managers import RandomizationTermCfg as RandTerm
from omni.isaac.orbit.managers import RewardTermCfg as RewTerm
from omni.isaac.orbit.managers import SceneEntityCfg
from omni.isaac.orbit.managers import TerminationTermCfg as DoneTerm
from omni.isaac.orbit.scene import InteractiveSceneCfg
from omni.isaac.orbit.terrains import TerrainImporterCfg
from omni.isaac.orbit.utils import configclass
from omni.isaac.orbit.utils.assets import ISAAC_NUCLEUS_DIR
import omni.isaac.orbit_tasks.classic.humanoid.mdp as mdp
##
# Scene definition
##
@configclass
class MySceneCfg(InteractiveSceneCfg):
"""Configuration for the terrain scene with a humanoid robot."""
# terrain
terrain = TerrainImporterCfg(
prim_path="/World/ground",
terrain_type="plane",
collision_group=-1,
physics_material=sim_utils.RigidBodyMaterialCfg(static_friction=1.0, dynamic_friction=1.0, restitution=0.0),
debug_vis=False,
)
# robot
robot = ArticulationCfg(
prim_path="{ENV_REGEX_NS}/Robot",
spawn=sim_utils.UsdFileCfg(
usd_path=f"{ISAAC_NUCLEUS_DIR}/Robots/Humanoid/humanoid_instanceable.usd",
rigid_props=sim_utils.RigidBodyPropertiesCfg(
disable_gravity=None,
max_depenetration_velocity=10.0,
enable_gyroscopic_forces=True,
),
articulation_props=sim_utils.ArticulationRootPropertiesCfg(
enabled_self_collisions=True,
solver_position_iteration_count=4,
solver_velocity_iteration_count=0,
sleep_threshold=0.005,
stabilization_threshold=0.001,
),
copy_from_source=False,
),
init_state=ArticulationCfg.InitialStateCfg(
pos=(0.0, 0.0, 1.34),
joint_pos={".*": 0.0},
),
actuators={
"body": ImplicitActuatorCfg(
joint_names_expr=[".*"],
stiffness={
".*_waist.*": 20.0,
".*_upper_arm.*": 10.0,
"pelvis": 10.0,
".*_lower_arm": 2.0,
".*_thigh:0": 10.0,
".*_thigh:1": 20.0,
".*_thigh:2": 10.0,
".*_shin": 5.0,
".*_foot.*": 2.0,
},
damping={
".*_waist.*": 5.0,
".*_upper_arm.*": 5.0,
"pelvis": 5.0,
".*_lower_arm": 1.0,
".*_thigh:0": 5.0,
".*_thigh:1": 5.0,
".*_thigh:2": 5.0,
".*_shin": 0.1,
".*_foot.*": 1.0,
},
),
},
)
# lights
light = AssetBaseCfg(
prim_path="/World/light",
spawn=sim_utils.DistantLightCfg(color=(0.75, 0.75, 0.75), intensity=3000.0),
)
@configclass
class ActionsCfg:
"""Action specifications for the MDP."""
joint_effort = mdp.JointEffortActionCfg(
asset_name="robot",
joint_names=[".*"],
scale={
".*_waist.*": 67.5,
".*_upper_arm.*": 67.5,
"pelvis": 67.5,
".*_lower_arm": 45.0,
".*_thigh:0": 45.0,
".*_thigh:1": 135.0,
".*_thigh:2": 45.0,
".*_shin": 90.0,
".*_foot.*": 22.5,
},
)
@configclass
class ObservationsCfg:
"""Observation specifications for the MDP."""
@configclass
class PolicyCfg(ObsGroup):
"""Observations for the policy."""
base_height = ObsTerm(func=mdp.base_pos_z)
base_lin_vel = ObsTerm(func=mdp.base_lin_vel)
base_ang_vel = ObsTerm(func=mdp.base_ang_vel, scale=0.25)
base_yaw_roll = ObsTerm(func=mdp.base_yaw_roll)
base_angle_to_target = ObsTerm(func=mdp.base_angle_to_target, params={"target_pos": (1000.0, 0.0, 0.0)})
base_up_proj = ObsTerm(func=mdp.base_up_proj)
base_heading_proj = ObsTerm(func=mdp.base_heading_proj, params={"target_pos": (1000.0, 0.0, 0.0)})
joint_pos_norm = ObsTerm(func=mdp.joint_pos_norm)
joint_vel_rel = ObsTerm(func=mdp.joint_vel_rel, scale=0.1)
feet_body_forces = ObsTerm(
func=mdp.body_incoming_wrench,
scale=0.01,
params={"asset_cfg": SceneEntityCfg("robot", body_names=["left_foot", "right_foot"])},
)
actions = ObsTerm(func=mdp.last_action)
def __post_init__(self):
self.enable_corruption = False
self.concatenate_terms = True
# observation groups
policy: PolicyCfg = PolicyCfg()
@configclass
class RandomizationCfg:
"""Configuration for randomization."""
reset_base = RandTerm(
func=mdp.reset_root_state_uniform,
mode="reset",
params={"pose_range": {}, "velocity_range": {}},
)
reset_robot_joints = RandTerm(
func=mdp.reset_joints_by_offset,
mode="reset",
params={
"position_range": (-0.2, 0.2),
"velocity_range": (-0.1, 0.1),
},
)
@configclass
class RewardsCfg:
"""Reward terms for the MDP."""
# (1) Reward for moving forward
progress = RewTerm(func=mdp.progress_reward, weight=1.0, params={"target_pos": (1000.0, 0.0, 0.0)})
# (2) Stay alive bonus
alive = RewTerm(func=mdp.is_alive, weight=2.0)
# (3) Reward for non-upright posture
upright = RewTerm(func=mdp.upright_posture_bonus, weight=0.1, params={"threshold": 0.93})
# (4) Reward for moving in the right direction
move_to_target = RewTerm(
func=mdp.move_to_target_bonus, weight=0.5, params={"threshold": 0.8, "target_pos": (1000.0, 0.0, 0.0)}
)
# (5) Penalty for large action commands
action_l2 = RewTerm(func=mdp.action_l2, weight=-0.01)
# (6) Penalty for energy consumption
energy = RewTerm(
func=mdp.power_consumption,
weight=-0.005,
params={
"gear_ratio": {
".*_waist.*": 67.5,
".*_upper_arm.*": 67.5,
"pelvis": 67.5,
".*_lower_arm": 45.0,
".*_thigh:0": 45.0,
".*_thigh:1": 135.0,
".*_thigh:2": 45.0,
".*_shin": 90.0,
".*_foot.*": 22.5,
}
},
)
# (7) Penalty for reaching close to joint limits
joint_limits = RewTerm(
func=mdp.joint_limits_penalty_ratio,
weight=-0.25,
params={
"threshold": 0.98,
"gear_ratio": {
".*_waist.*": 67.5,
".*_upper_arm.*": 67.5,
"pelvis": 67.5,
".*_lower_arm": 45.0,
".*_thigh:0": 45.0,
".*_thigh:1": 135.0,
".*_thigh:2": 45.0,
".*_shin": 90.0,
".*_foot.*": 22.5,
},
},
)
@configclass
class TerminationsCfg:
"""Termination terms for the MDP."""
# (1) Terminate if the episode length is exceeded
time_out = DoneTerm(func=mdp.time_out, time_out=True)
# (2) Terminate if the robot falls
torso_height = DoneTerm(func=mdp.base_height, params={"minimum_height": 0.8})
@configclass
class CurriculumCfg:
"""Curriculum terms for the MDP."""
pass
@configclass
class HumanoidEnvCfg(RLTaskEnvCfg):
"""Configuration for the MuJoCo-style Humanoid walking environment."""
# Scene settings
scene: MySceneCfg = MySceneCfg(num_envs=4096, env_spacing=5.0)
# Basic settings
observations: ObservationsCfg = ObservationsCfg()
actions: ActionsCfg = ActionsCfg()
commands: NullCommandGeneratorCfg = NullCommandGeneratorCfg()
# MDP settings
rewards: RewardsCfg = RewardsCfg()
terminations: TerminationsCfg = TerminationsCfg()
randomization: RandomizationCfg = RandomizationCfg()
curriculum: CurriculumCfg = CurriculumCfg()
def __post_init__(self):
"""Post initialization."""
# general settings
self.decimation = 2
self.episode_length_s = 16.0
# simulation settings
self.sim.dt = 1 / 120.0
self.sim.physx.bounce_threshold_velocity = 0.2
# default friction material
self.sim.physics_material.static_friction = 1.0
self.sim.physics_material.dynamic_friction = 1.0
self.sim.physics_material.restitution = 0.0
# Copyright (c) 2022-2023, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""This sub-module contains the functions that are specific to the humanoid environment."""
from omni.isaac.orbit.envs.mdp import * # noqa: F401, F403
from .observations import *
from .rewards import *
# Copyright (c) 2022-2023, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import torch
from typing import TYPE_CHECKING
import omni.isaac.orbit.utils.math as math_utils
from omni.isaac.orbit.assets import Articulation
from omni.isaac.orbit.managers import SceneEntityCfg
if TYPE_CHECKING:
from ..humanoid_env import HumanoidEnv
def base_yaw_roll(env: HumanoidEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""Yaw and roll of the base in the simulation world frame."""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# extract euler angles (in world frame)
roll, _, yaw = math_utils.euler_xyz_from_quat(asset.data.root_quat_w)
# normalize angle to [-pi, pi]
roll = torch.atan2(torch.sin(roll), torch.cos(roll))
yaw = torch.atan2(torch.sin(yaw), torch.cos(yaw))
return torch.cat((yaw.unsqueeze(-1), roll.unsqueeze(-1)), dim=-1)
def base_up_proj(env: HumanoidEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""Projection of the base up vector onto the world up vector."""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# compute base up vector
base_up_vec = math_utils.quat_rotate(asset.data.root_quat_w, -asset._GRAVITY_VEC_W) # type: ignore
return base_up_vec[:, 2].unsqueeze(-1)
def base_heading_proj(
env: HumanoidEnv, target_pos: tuple[float, float, float], asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")
) -> torch.Tensor:
"""Projection of the base forward vector onto the world forward vector."""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# compute desired heading direction
to_target_pos = torch.tensor(target_pos, device=env.device) - asset.data.root_pos_w[:, :3]
to_target_pos[:, 2] = 0.0
to_target_dir = math_utils.normalize(to_target_pos)
# compute base forward vector
heading_vec = math_utils.quat_rotate(asset.data.root_quat_w, asset._FORWARD_VEC_B) # type: ignore
# compute dot product between heading and target direction
heading_proj = torch.bmm(heading_vec.view(env.num_envs, 1, 3), to_target_dir.view(env.num_envs, 3, 1))
return heading_proj.view(env.num_envs, 1)
def base_angle_to_target(
env: HumanoidEnv, target_pos: tuple[float, float, float], asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")
) -> torch.Tensor:
"""Angle between the base forward vector and the vector to the target."""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# compute desired heading direction
to_target_pos = torch.tensor(target_pos, device=env.device) - asset.data.root_pos_w[:, :3]
walk_target_angle = torch.atan2(to_target_pos[:, 1], to_target_pos[:, 0])
# compute base forward vector
_, _, yaw = math_utils.euler_xyz_from_quat(asset.data.root_quat_w)
# normalize angle to target to [-pi, pi]
angle_to_target = walk_target_angle - yaw
angle_to_target = torch.atan2(torch.sin(angle_to_target), torch.cos(angle_to_target))
return angle_to_target.unsqueeze(-1)
# Copyright (c) 2022-2023, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import torch
from typing import TYPE_CHECKING
import omni.isaac.orbit.utils.math as math_utils
import omni.isaac.orbit.utils.string as string_utils
from omni.isaac.orbit.assets import Articulation
from omni.isaac.orbit.managers import ManagerTermBase, RewardTermCfg, SceneEntityCfg
from . import observations as obs
if TYPE_CHECKING:
from ..humanoid_env import HumanoidEnv
def upright_posture_bonus(
env: HumanoidEnv, threshold: float, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")
) -> torch.Tensor:
"""Reward for maintaining an upright posture."""
up_proj = obs.base_up_proj(env, asset_cfg).squeeze(-1)
return (up_proj > threshold).float()
def move_to_target_bonus(
env: HumanoidEnv,
threshold: float,
target_pos: tuple[float, float, float],
asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"),
) -> torch.Tensor:
"""Reward for moving to the target heading."""
heading_proj = obs.base_heading_proj(env, target_pos, asset_cfg).squeeze(-1)
return torch.where(heading_proj > threshold, 1.0, heading_proj / threshold)
class progress_reward(ManagerTermBase):
"""Reward for making progress towards the target."""
def __init__(self, env: HumanoidEnv, cfg: RewardTermCfg):
# initialize the base class
super().__init__(cfg, env)
# create history buffer
self.potentials = torch.zeros(env.num_envs, device=env.device)
self.prev_potentials = torch.zeros_like(self.potentials)
def reset(self, env_ids: torch.Tensor):
# extract the used quantities (to enable type-hinting)
asset: Articulation = self._env.scene["robot"]
# compute projection of current heading to desired heading vector
target_pos = torch.tensor(self.cfg.params["target_pos"], device=self.device)
to_target_pos = target_pos - asset.data.root_pos_w[env_ids, :3]
# reward terms
self.potentials[env_ids] = -torch.norm(to_target_pos, p=2, dim=-1) / self._env.step_dt
self.prev_potentials[env_ids] = self.potentials[env_ids]
def __call__(
self,
env: HumanoidEnv,
target_pos: tuple[float, float, float],
asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"),
) -> torch.Tensor:
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# compute vector to target
target_pos = torch.tensor(target_pos, device=env.device)
to_target_pos = target_pos - asset.data.root_pos_w[:, :3]
to_target_pos[:, 2] = 0.0
# update history buffer and compute new potential
self.prev_potentials[:] = self.potentials[:]
self.potentials[:] = -torch.norm(to_target_pos, p=2, dim=-1) / env.step_dt
return self.potentials - self.prev_potentials
class joint_limits_penalty_ratio(ManagerTermBase):
"""Penalty for violating joint limits weighted by the gear ratio."""
def __init__(self, env: HumanoidEnv, cfg: RewardTermCfg):
# add default argument
if "asset_cfg" not in cfg.params:
cfg.params["asset_cfg"] = SceneEntityCfg("robot")
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[cfg.params["asset_cfg"].name]
# resolve the gear ratio for each joint
self.gear_ratio = torch.ones(env.num_envs, asset.num_joints, device=env.device)
index_list, _, value_list = string_utils.resolve_matching_names_values(
cfg.params["gear_ratio"], asset.joint_names
)
self.gear_ratio[:, index_list] = torch.tensor(value_list, device=env.device)
self.gear_ratio_scaled = self.gear_ratio / torch.max(self.gear_ratio)
def __call__(
self, env: HumanoidEnv, threshold: float, gear_ratio: dict[str, float], asset_cfg: SceneEntityCfg
) -> torch.Tensor:
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# compute the penalty over normalized joints
joint_pos_scaled = math_utils.scale_transform(
asset.data.joint_pos, asset.data.soft_joint_pos_limits[..., 0], asset.data.soft_joint_pos_limits[..., 1]
)
# scale the violation amount by the gear ratio
violation_amount = (torch.abs(joint_pos_scaled) - threshold) / (1 - threshold)
violation_amount = violation_amount * self.gear_ratio_scaled
return torch.sum((torch.abs(joint_pos_scaled) > threshold) * violation_amount, dim=-1)
class power_consumption(ManagerTermBase):
"""Penalty for the power consumed by the actions to the environment.
This is computed as commanded torque times the joint velocity.
"""
def __init__(self, env: HumanoidEnv, cfg: RewardTermCfg):
# add default argument
if "asset_cfg" not in cfg.params:
cfg.params["asset_cfg"] = SceneEntityCfg("robot")
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[cfg.params["asset_cfg"].name]
# resolve the gear ratio for each joint
self.gear_ratio = torch.ones(env.num_envs, asset.num_joints, device=env.device)
index_list, _, value_list = string_utils.resolve_matching_names_values(
cfg.params["gear_ratio"], asset.joint_names
)
self.gear_ratio[:, index_list] = torch.tensor(value_list, device=env.device)
self.gear_ratio_scaled = self.gear_ratio / torch.max(self.gear_ratio)
def __call__(self, env: HumanoidEnv, gear_ratio: dict[str, float], asset_cfg: SceneEntityCfg) -> torch.Tensor:
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# return power = torque * velocity (here actions: joint torques)
return torch.sum(torch.abs(env.action_manager.action * asset.data.joint_vel * self.gear_ratio_scaled), dim=-1)
......@@ -241,6 +241,10 @@ class RlGamesVecEnvWrapper(IVecEnv):
actions = torch.clamp(actions, -self._clip_actions, self._clip_actions)
# perform environment step
obs_dict, rew, terminated, truncated, extras = self.env.step(actions)
# move time out information to the extras dict
# note: only useful when `value_bootstrap` is True in the agent configuration
extras["time_outs"] = truncated.to(device=self._rl_device)
# process observations and states
obs_and_states = self._process_obs(obs_dict)
# move buffers to rl-device
......
......@@ -48,6 +48,8 @@ from rl_games.common import env_configurations, vecenv
from rl_games.common.player import BasePlayer
from rl_games.torch_runner import Runner
from omni.isaac.orbit.utils.assets import retrieve_file_path
import omni.isaac.contrib_tasks # noqa: F401
import omni.isaac.orbit_tasks # noqa: F401
from omni.isaac.orbit_tasks.utils import get_checkpoint_path, load_cfg_from_registry, parse_env_cfg
......@@ -94,7 +96,7 @@ def main():
# get path to previous checkpoint
resume_path = get_checkpoint_path(log_root_path, run_dir, checkpoint_file, other_dirs=["nn"])
else:
resume_path = os.path.abspath(args_cli.checkpoint)
resume_path = retrieve_file_path(args_cli.checkpoint)
# load previously trained model
agent_cfg["params"]["load_checkpoint"] = True
agent_cfg["params"]["load_path"] = resume_path
......@@ -109,8 +111,6 @@ def main():
agent: BasePlayer = runner.create_player()
agent.restore(resume_path)
agent.reset()
# flag for inference
is_deterministic = True
# reset environment
obs = env.reset()
......@@ -126,7 +126,7 @@ def main():
# convert obs to agent format
obs = agent.obs_to_torch(obs)
# agent stepping
actions = agent.get_action(obs, is_deterministic)
actions = agent.get_action(obs, is_deterministic=True)
# env stepping
obs, _, dones, _ = env.step(actions)
......
......@@ -128,7 +128,7 @@ def main():
agent.set_logger(new_logger)
# callbacks for agent
checkpoint_callback = CheckpointCallback(save_freq=100, save_path=log_dir, name_prefix="model", verbose=2)
checkpoint_callback = CheckpointCallback(save_freq=1000, save_path=log_dir, name_prefix="model", verbose=2)
# train the agent
agent.learn(total_timesteps=n_timesteps, callback=checkpoint_callback)
# save the final model
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment