Unverified Commit 1b293e15 authored by David Hoeller's avatar David Hoeller Committed by GitHub

Fixes lift environment for Franka robot (#264)

# Description

Adds the Franka lift RL environment. It also checks that the reach
environment is working.

## Type of change

- New feature (non-breaking change which adds functionality)

## Checklist

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./orbit.sh --format`
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [x] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there
parent 0691eef6
......@@ -52,22 +52,22 @@ FRANKA_PANDA_ARM_WITH_PANDA_HAND_CFG = ArticulationCfg(
joint_names_expr=["panda_joint[1-4]"],
effort_limit=87.0,
velocity_limit=100.0,
stiffness=800.0,
damping=40.0,
stiffness=80.0,
damping=4.0,
),
"panda_forearm": ImplicitActuatorCfg(
joint_names_expr=["panda_joint[5-7]"],
effort_limit=12.0,
effort_limit=87.0,
velocity_limit=100.0,
stiffness=800.0,
damping=40.0,
stiffness=80.0,
damping=4.0,
),
"panda_hand": ImplicitActuatorCfg(
joint_names_expr=["panda_finger_joint.*"],
effort_limit=200.0,
velocity_limit=0.2,
stiffness=1e5,
damping=1e3,
stiffness=2e3,
damping=1e2,
),
},
soft_joint_pos_limit_factor=1.0,
......
......@@ -3,10 +3,7 @@
#
# SPDX-License-Identifier: BSD-3-Clause
"""Sub-module containing utilities for various math operations.
Some of these are imported from the module `omni.isaac.core.utils.torch` for convenience.
"""
"""Sub-module containing utilities for various math operations."""
from __future__ import annotations
......@@ -690,7 +687,7 @@ def is_identity_pose(pos: torch.tensor, rot: torch.tensor) -> bool:
# @torch.jit.script
def combine_frame_transforms(
t01: torch.Tensor, q01: torch.Tensor, t12: torch.Tensor = None, q12: torch.Tensor = None
t01: torch.Tensor, q01: torch.Tensor, t12: torch.Tensor | None = None, q12: torch.Tensor | None = None
) -> tuple[torch.Tensor, torch.Tensor]:
r"""Combine transformations between two reference frames into a stationary frame.
......@@ -701,7 +698,9 @@ def combine_frame_transforms(
t01: Position of frame 1 w.r.t. frame 0. Shape is (N, 3).
q01: Quaternion orientation of frame 1 w.r.t. frame 0 in (w, x, y, z). Shape is (N, 4).
t12: Position of frame 2 w.r.t. frame 1. Shape is (N, 3).
Defaults to None, in which case the position is assumed to be zero.
q12: Quaternion orientation of frame 2 w.r.t. frame 1 in (w, x, y, z). Shape is (N, 4).
Defaults to None, in which case the orientation is assumed to be identity.
Returns:
A tuple containing the position and orientation of frame 2 w.r.t. frame 0.
......@@ -721,9 +720,9 @@ def combine_frame_transforms(
return t02, q02
@torch.jit.script
# @torch.jit.script
def subtract_frame_transforms(
t01: torch.Tensor, q01: torch.Tensor, t02: torch.Tensor, q02: torch.Tensor
t01: torch.Tensor, q01: torch.Tensor, t02: torch.Tensor | None = None, q02: torch.Tensor | None = None
) -> tuple[torch.Tensor, torch.Tensor]:
r"""Subtract transformations between two reference frames into a stationary frame.
......@@ -734,7 +733,9 @@ def subtract_frame_transforms(
t01: Position of frame 1 w.r.t. frame 0. Shape is (N, 3).
q01: Quaternion orientation of frame 1 w.r.t. frame 0 in (w, x, y, z). Shape is (N, 4).
t02: Position of frame 2 w.r.t. frame 0. Shape is (N, 3).
Defaults to None, in which case the position is assumed to be zero.
q02: Quaternion orientation of frame 2 w.r.t. frame 0 in (w, x, y, z). Shape is (N, 4).
Defaults to None, in which case the orientation is assumed to be identity.
Returns:
A tuple containing the position and orientation of frame 2 w.r.t. frame 1.
......@@ -742,10 +743,15 @@ def subtract_frame_transforms(
"""
# compute orientation
q10 = quat_inv(q01)
q12 = quat_mul(q10, q02)
if q02 is not None:
q12 = quat_mul(q10, q02)
else:
q12 = q10
# compute translation
t12 = quat_apply(q10, t02 - t01)
if t02 is not None:
t12 = quat_apply(q10, t02 - t01)
else:
t12 = quat_apply(q10, -t01)
return t12, q12
......@@ -816,7 +822,7 @@ def apply_delta_pose(
source_pos: Position of source frame. Shape is (N, 3).
source_rot: Quaternion orientation of source frame in (w, x, y, z). Shape is (N, 4)..
delta_pose: Position and orientation displacements. Shape is (N, 6).
eps: The tolerance to consider orientation displacement as zero.
eps: The tolerance to consider orientation displacement as zero. Defaults to 1.0e-6.
Returns:
A tuple containing the displaced position and orientation frames.
......@@ -864,7 +870,9 @@ def transform_points(
Args:
points: Points to transform. Shape is (N, P, 3) or (P, 3).
pos: Position of the target frame. Shape is (N, 3) or (3,).
Defaults to None, in which case the position is assumed to be zero.
quat: Quaternion orientation of the target frame in (w, x, y, z). Shape is (N, 4) or (4,).
Defaults to None, in which case the orientation is assumed to be identity.
Returns:
Transformed points in the target frame. Shape is (N, P, 3) or (P, 3).
......
......@@ -3,29 +3,7 @@
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Environment for lifting an object with fixed-base robot.
"""
"""Configurations for the object lift environments."""
import gymnasium as gym
from . import agents
##
# Register Gym environments.
##
gym.register(
id="Isaac-Lift-Franka-v0",
entry_point="omni.isaac.orbit.envs:RLTaskEnv",
disable_env_checker=True,
kwargs={
"env_cfg_entry_point": f"{__name__}.lift_env_cfg:LiftEnvCfg",
"rl_games_cfg_entry_point": f"{agents.__name__}:rl_games_ppo_cfg.yaml",
"rsl_rl_cfg_entry_point": f"{agents.__name__}.rsl_rl_ppo_cfg:LIFT_RSL_RL_PPO_CFG",
"skrl_cfg_entry_point": f"{agents.__name__}:skrl_ppo_cfg.yaml",
"sb3_cfg_entry_point": f"{agents.__name__}:sb3_ppo_cfg.yaml",
"robomimic_bc_cfg_entry_point": f"{agents.__name__}.robomimic:bc.json",
"robomimic_bcq_cfg_entry_point": f"{agents.__name__}.robomimic:bcq.json",
},
)
# We leave this file empty since we don't want to expose any configs in this package directly.
# We still need this file to import the "config" module in the parent package.
# Copyright (c) 2022-2023, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Configurations for the object lift environments."""
# We leave this file empty since we don't want to expose any configs in this package directly.
# We still need this file to import the "config" module in the parent package.
# Copyright (c) 2022-2023, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import gymnasium as gym
from . import agents, env_cfg
##
# Register Gym environments.
##
gym.register(
id="Isaac-Lift-Cube-Franka-v0",
entry_point="omni.isaac.orbit.envs:RLTaskEnv",
kwargs={
"env_cfg_entry_point": env_cfg.FrankaCubeLiftEnvCfg,
"rsl_rl_cfg_entry_point": agents.rsl_rl_cfg.LiftCubePPORunnerCfg,
},
disable_env_checker=True,
)
gym.register(
id="Isaac-Lift-Cube-Franka-Play-v0",
entry_point="omni.isaac.orbit.envs:RLTaskEnv",
kwargs={
"env_cfg_entry_point": env_cfg.FrankaCubeLiftEnvCfg_PLAY,
"rsl_rl_cfg_entry_point": agents.rsl_rl_cfg.LiftCubePPORunnerCfg,
},
disable_env_checker=True,
)
# Copyright (c) 2022-2023, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from . import rsl_rl_cfg # noqa: F401, F403
......@@ -3,40 +3,39 @@
#
# SPDX-License-Identifier: BSD-3-Clause
from omni.isaac.orbit.utils import configclass
from omni.isaac.orbit_tasks.utils.wrappers.rsl_rl import (
RslRlOnPolicyRunnerCfg,
RslRlPpoActorCriticCfg,
RslRlPpoAlgorithmCfg,
)
LIFT_RSL_RL_PPO_CFG = RslRlOnPolicyRunnerCfg(
num_steps_per_env=96,
max_iterations=700,
save_interval=50,
experiment_name="lift",
run_name="",
resume=False,
load_run=-1,
load_checkpoint=-1,
empirical_normalization=False,
policy=RslRlPpoActorCriticCfg(
init_noise_std=1.0,
@configclass
class LiftCubePPORunnerCfg(RslRlOnPolicyRunnerCfg):
num_steps_per_env = 24
max_iterations = 1500
save_interval = 50
experiment_name = "franka_lift"
empirical_normalization = False
policy = RslRlPpoActorCriticCfg(
init_noise_std=0.8,
actor_hidden_dims=[256, 128, 64],
critic_hidden_dims=[256, 128, 64],
activation="elu",
),
algorithm=RslRlPpoAlgorithmCfg(
)
algorithm = RslRlPpoAlgorithmCfg(
value_loss_coef=1.0,
use_clipped_value_loss=True,
clip_param=0.2,
entropy_coef=0.01,
entropy_coef=0.006,
num_learning_epochs=5,
num_mini_batches=4,
learning_rate=1.0e-3,
learning_rate=1.0e-4,
schedule="adaptive",
gamma=0.99,
gamma=0.98,
lam=0.95,
desired_kl=0.01,
max_grad_norm=1.0,
),
)
)
# Copyright (c) 2022-2023, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from omni.isaac.orbit.assets import RigidObjectCfg
from omni.isaac.orbit.sensors import FrameTransformerCfg
from omni.isaac.orbit.sensors.frame_transformer.frame_transformer_cfg import OffsetCfg
from omni.isaac.orbit.sim.schemas.schemas_cfg import RigidBodyPropertiesCfg
from omni.isaac.orbit.sim.spawners.from_files.from_files_cfg import UsdFileCfg
from omni.isaac.orbit.utils import configclass
from omni.isaac.orbit.utils.assets import ISAAC_NUCLEUS_DIR
from omni.isaac.orbit_tasks.manipulation.lift import mdp
from omni.isaac.orbit_tasks.manipulation.lift.lift_env_cfg import LiftEnvCfg
##
# Pre-defined configs
##
# isort: off
from omni.isaac.orbit.assets.config.franka import FRANKA_PANDA_ARM_WITH_PANDA_HAND_CFG
from omni.isaac.orbit.markers.config import FRAME_MARKER_CFG
@configclass
class FrankaCubeLiftEnvCfg(LiftEnvCfg):
def __post_init__(self):
# post init of parent
super().__post_init__()
# Set Franka as robot
self.scene.robot = FRANKA_PANDA_ARM_WITH_PANDA_HAND_CFG.replace(prim_path="{ENV_REGEX_NS}/Robot")
# Set actions for the specific robot type (franka)
self.actions.body_joint_pos = mdp.JointPositionActionCfg(
asset_name="robot", joint_names=["panda_joint.*"], scale=1.0, use_default_offset=True
)
self.actions.finger_joint_pos = mdp.BinaryJointPositionActionCfg(
asset_name="robot",
joint_names=["panda_finger.*"],
open_command_expr={"panda_finger_.*": 0.3},
close_command_expr={"panda_finger_.*": 0.0},
)
# Set the body name for the end effector
self.commands.body_name = "panda_hand"
# Set Cube as object
self.scene.object = RigidObjectCfg(
prim_path="{ENV_REGEX_NS}/Object",
init_state=RigidObjectCfg.InitialStateCfg(pos=[0.5, 0, 0.055], rot=[1, 0, 0, 0]),
spawn=UsdFileCfg(
usd_path=f"{ISAAC_NUCLEUS_DIR}/Props/Blocks/DexCube/dex_cube_instanceable.usd",
scale=(0.8, 0.8, 0.8),
rigid_props=RigidBodyPropertiesCfg(
solver_position_iteration_count=16,
solver_velocity_iteration_count=1,
max_angular_velocity=1000.0,
max_linear_velocity=1000.0,
max_depenetration_velocity=5.0,
disable_gravity=False,
),
),
)
# Listens to the required transforms
marker_cfg = FRAME_MARKER_CFG.copy()
marker_cfg.markers["frame"].scale = (0.1, 0.1, 0.1)
marker_cfg.prim_path = "/Visuals/FrameTransformer"
self.scene.ee_frame = FrameTransformerCfg(
prim_path="{ENV_REGEX_NS}/Robot/panda_link0",
debug_vis=False,
visualizer_cfg=marker_cfg,
target_frames=[
FrameTransformerCfg.FrameCfg(
prim_path="{ENV_REGEX_NS}/Robot/panda_hand",
name="end_effector",
offset=OffsetCfg(
pos=[0.0, 0.0, 0.1034],
),
),
],
)
@configclass
class FrankaCubeLiftEnvCfg_PLAY(FrankaCubeLiftEnvCfg):
def __post_init__(self):
# post init of parent
super().__post_init__()
# make a smaller scene for play
self.scene.num_envs = 50
self.scene.env_spacing = 2.5
# disable randomization for play
self.observations.policy.enable_corruption = False
# Copyright (c) 2022-2023, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
from omni.isaac.orbit.controllers.differential_inverse_kinematics import DifferentialInverseKinematicsCfg
from omni.isaac.orbit.objects import RigidObjectCfg
from omni.isaac.orbit.robots.config.franka import FRANKA_PANDA_ARM_WITH_PANDA_HAND_CFG
from omni.isaac.orbit.robots.single_arm import SingleArmManipulatorCfg
from omni.isaac.orbit.utils import configclass
from omni.isaac.orbit.utils.assets import ISAAC_NUCLEUS_DIR
from omni.isaac.orbit_tasks.isaac_env_cfg import EnvCfg, IsaacEnvCfg, PhysxCfg, SimCfg, ViewerCfg
##
# Scene settings
##
@configclass
class TableCfg:
"""Properties for the table."""
# note: we use instanceable asset since it consumes less memory
usd_path = f"{ISAAC_NUCLEUS_DIR}/Props/Mounts/SeattleLabTable/table_instanceable.usd"
@configclass
class ManipulationObjectCfg(RigidObjectCfg):
"""Properties for the object to manipulate in the scene."""
meta_info = RigidObjectCfg.MetaInfoCfg(
usd_path=f"{ISAAC_NUCLEUS_DIR}/Props/Blocks/DexCube/dex_cube_instanceable.usd",
scale=(0.8, 0.8, 0.8),
)
init_state = RigidObjectCfg.InitialStateCfg(
pos=(0.4, 0.0, 0.075), rot=(1.0, 0.0, 0.0, 0.0), lin_vel=(0.0, 0.0, 0.0), ang_vel=(0.0, 0.0, 0.0)
)
rigid_props = RigidObjectCfg.RigidBodyPropertiesCfg(
solver_position_iteration_count=16,
solver_velocity_iteration_count=1,
max_angular_velocity=1000.0,
max_linear_velocity=1000.0,
max_depenetration_velocity=5.0,
disable_gravity=False,
)
physics_material = RigidObjectCfg.PhysicsMaterialCfg(
static_friction=0.5, dynamic_friction=0.5, restitution=0.0, prim_path="/World/Materials/cubeMaterial"
)
@configclass
class GoalMarkerCfg:
"""Properties for visualization marker."""
# usd file to import
usd_path = f"{ISAAC_NUCLEUS_DIR}/Props/UIElements/frame_prim.usd"
# scale of the asset at import
scale = [0.05, 0.05, 0.05] # x,y,z
@configclass
class FrameMarkerCfg:
"""Properties for visualization marker."""
# usd file to import
usd_path = f"{ISAAC_NUCLEUS_DIR}/Props/UIElements/frame_prim.usd"
# scale of the asset at import
scale = [0.1, 0.1, 0.1] # x,y,z
##
# MDP settings
##
@configclass
class RandomizationCfg:
"""Randomization of scene at reset."""
@configclass
class ObjectInitialPoseCfg:
"""Randomization of object initial pose."""
# category
position_cat: str = "default" # randomize position: "default", "uniform"
orientation_cat: str = "default" # randomize position: "default", "uniform"
# randomize position
position_uniform_min = [0.4, -0.25, 0.075] # position (x,y,z)
position_uniform_max = [0.6, 0.25, 0.075] # position (x,y,z)
@configclass
class ObjectDesiredPoseCfg:
"""Randomization of object desired pose."""
# category
position_cat: str = "default" # randomize position: "default", "uniform"
orientation_cat: str = "default" # randomize position: "default", "uniform"
# randomize position
position_default = [0.5, 0.0, 0.5] # position default (x,y,z)
position_uniform_min = [0.4, -0.25, 0.25] # position (x,y,z)
position_uniform_max = [0.6, 0.25, 0.5] # position (x,y,z)
# randomize orientation
orientation_default = [1.0, 0.0, 0.0, 0.0] # orientation default
# initialize
object_initial_pose: ObjectInitialPoseCfg = ObjectInitialPoseCfg()
object_desired_pose: ObjectDesiredPoseCfg = ObjectDesiredPoseCfg()
@configclass
class ObservationsCfg:
"""Observation specifications for the MDP."""
@configclass
class PolicyCfg:
"""Observations for policy group."""
# global group settings
enable_corruption: bool = True
# observation terms
# -- joint state
arm_dof_pos = {"scale": 1.0}
# arm_dof_pos_scaled = {"scale": 1.0}
# arm_dof_vel = {"scale": 0.5, "noise": {"name": "uniform", "min": -0.01, "max": 0.01}}
tool_dof_pos_scaled = {"scale": 1.0}
# -- end effector state
tool_positions = {"scale": 1.0}
tool_orientations = {"scale": 1.0}
# -- object state
# object_positions = {"scale": 1.0}
# object_orientations = {"scale": 1.0}
object_relative_tool_positions = {"scale": 1.0}
# object_relative_tool_orientations = {"scale": 1.0}
# -- object desired state
object_desired_positions = {"scale": 1.0}
# -- previous action
arm_actions = {"scale": 1.0}
tool_actions = {"scale": 1.0}
# global observation settings
return_dict_obs_in_group = False
"""Whether to return observations as dictionary or flattened vector within groups."""
# observation groups
policy: PolicyCfg = PolicyCfg()
@configclass
class RewardsCfg:
"""Reward terms for the MDP."""
# -- robot-centric
# reaching_object_position_l2 = {"weight": 0.0}
# reaching_object_position_exp = {"weight": 2.5, "sigma": 0.25}
reaching_object_position_tanh = {"weight": 2.5, "sigma": 0.1}
# penalizing_arm_dof_velocity_l2 = {"weight": 1e-5}
# penalizing_tool_dof_velocity_l2 = {"weight": 1e-5}
# penalizing_robot_dof_acceleration_l2 = {"weight": 1e-7}
# -- action-centric
penalizing_arm_action_rate_l2 = {"weight": 1e-2}
# penalizing_tool_action_l2 = {"weight": 1e-2}
# -- object-centric
# tracking_object_position_exp = {"weight": 5.0, "sigma": 0.25, "threshold": 0.08}
tracking_object_position_tanh = {"weight": 5.0, "sigma": 0.2, "threshold": 0.08}
lifting_object_success = {"weight": 3.5, "threshold": 0.08}
@configclass
class TerminationsCfg:
"""Termination terms for the MDP."""
episode_timeout = True # reset when episode length ended
object_falling = True # reset when object falls off the table
is_success = False # reset when object is lifted
@configclass
class ControlCfg:
"""Processing of MDP actions."""
# action space
control_type = "default" # "default", "inverse_kinematics"
# decimation: Number of control action updates @ sim dt per policy dt
decimation = 2
# configuration loaded when control_type == "inverse_kinematics"
inverse_kinematics: DifferentialInverseKinematicsCfg = DifferentialInverseKinematicsCfg(
command_type="pose_rel",
ik_method="dls",
position_command_scale=(0.1, 0.1, 0.1),
rotation_command_scale=(0.1, 0.1, 0.1),
)
##
# Environment configuration
##
@configclass
class LiftEnvCfg(IsaacEnvCfg):
"""Configuration for the Lift environment."""
# General Settings
env: EnvCfg = EnvCfg(num_envs=4096, env_spacing=2.5, episode_length_s=5.0)
viewer: ViewerCfg = ViewerCfg(debug_vis=True, eye=(7.5, 7.5, 7.5), lookat=(0.0, 0.0, 0.0))
# Physics settings
sim: SimCfg = SimCfg(
dt=0.01,
substeps=1,
physx=PhysxCfg(
gpu_found_lost_aggregate_pairs_capacity=1024 * 1024 * 4,
gpu_total_aggregate_pairs_capacity=16 * 1024,
friction_correlation_distance=0.00625,
friction_offset_threshold=0.01,
bounce_threshold_velocity=0.2,
),
)
# Scene Settings
# -- robot
robot: SingleArmManipulatorCfg = FRANKA_PANDA_ARM_WITH_PANDA_HAND_CFG
# -- object
object: ManipulationObjectCfg = ManipulationObjectCfg()
# -- table
table: TableCfg = TableCfg()
# -- visualization marker
goal_marker: GoalMarkerCfg = GoalMarkerCfg()
frame_marker: FrameMarkerCfg = FrameMarkerCfg()
# MDP settings
randomization: RandomizationCfg = RandomizationCfg()
observations: ObservationsCfg = ObservationsCfg()
rewards: RewardsCfg = RewardsCfg()
terminations: TerminationsCfg = TerminationsCfg()
# Controller settings
control: ControlCfg = ControlCfg()
# Copyright (c) 2022-2023, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import gymnasium as gym
import math
import torch
import omni.isaac.core.utils.prims as prim_utils
import omni.isaac.orbit.compat.utils.kit as kit_utils
from omni.isaac.orbit.compat.markers import StaticMarker
from omni.isaac.orbit.compat.utils.mdp import ObservationManager, RewardManager
from omni.isaac.orbit.controllers.differential_inverse_kinematics import DifferentialInverseKinematics
from omni.isaac.orbit.objects import RigidObject
from omni.isaac.orbit.robots.single_arm import SingleArmManipulator
from omni.isaac.orbit.utils.dict import class_to_dict
from omni.isaac.orbit.utils.math import quat_inv, quat_mul, random_orientation, sample_uniform, scale_transform
from omni.isaac.orbit_tasks.isaac_env import IsaacEnv, VecEnvIndices, VecEnvObs
from .lift_cfg import LiftEnvCfg, RandomizationCfg
class LiftEnv(IsaacEnv):
"""Environment for lifting an object off a table with a single-arm manipulator."""
def __init__(self, cfg: LiftEnvCfg = None, **kwargs):
# copy configuration
self.cfg = cfg
# parse the configuration for controller configuration
# note: controller decides the robot control mode
self._pre_process_cfg()
# create classes (these are called by the function :meth:`_design_scene`)
self.robot = SingleArmManipulator(cfg=self.cfg.robot)
self.object = RigidObject(cfg=self.cfg.object)
# initialize the base class to setup the scene.
super().__init__(self.cfg, **kwargs)
# parse the configuration for information
self._process_cfg()
# initialize views for the cloned scenes
self._initialize_views()
# prepare the observation manager
self._observation_manager = LiftObservationManager(class_to_dict(self.cfg.observations), self, self.device)
# prepare the reward manager
self._reward_manager = LiftRewardManager(
class_to_dict(self.cfg.rewards), self, self.num_envs, self.dt, self.device
)
# print information about MDP
print("[INFO] Observation Manager:", self._observation_manager)
print("[INFO] Reward Manager: ", self._reward_manager)
# compute the observation space: arm joint state + ee-position + goal-position + actions
num_obs = self._observation_manager.group_obs_dim["policy"][0]
self.observation_space = gym.spaces.Box(low=-math.inf, high=math.inf, shape=(num_obs,))
# compute the action space
self.action_space = gym.spaces.Box(low=-1.0, high=1.0, shape=(self.num_actions,))
print("[INFO]: Completed setting up the environment...")
# Take an initial step to initialize the scene.
# This is required to compute quantities like Jacobians used in step().
self.sim.step()
# -- fill up buffers
self.object.update_buffers(self.dt)
self.robot.update_buffers(self.dt)
"""
Implementation specifics.
"""
def _design_scene(self) -> list[str]:
# ground plane
kit_utils.create_ground_plane("/World/defaultGroundPlane", z_position=-1.05)
# table
prim_utils.create_prim(self.template_env_ns + "/Table", usd_path=self.cfg.table.usd_path)
# robot
self.robot.spawn(self.template_env_ns + "/Robot")
# object
self.object.spawn(self.template_env_ns + "/Object")
# setup debug visualization
if self.cfg.viewer.debug_vis and self.enable_render:
# create point instancer to visualize the goal points
self._goal_markers = StaticMarker(
"/Visuals/object_goal",
self.num_envs,
usd_path=self.cfg.goal_marker.usd_path,
scale=self.cfg.goal_marker.scale,
)
# create marker for viewing end-effector pose
self._ee_markers = StaticMarker(
"/Visuals/ee_current",
self.num_envs,
usd_path=self.cfg.frame_marker.usd_path,
scale=self.cfg.frame_marker.scale,
)
# create marker for viewing command (if task-space controller is used)
if self.cfg.control.control_type == "inverse_kinematics":
self._cmd_markers = StaticMarker(
"/Visuals/ik_command",
self.num_envs,
usd_path=self.cfg.frame_marker.usd_path,
scale=self.cfg.frame_marker.scale,
)
# return list of global prims
return ["/World/defaultGroundPlane"]
def _reset_idx(self, env_ids: VecEnvIndices):
# randomize the MDP
# -- robot DOF state
dof_pos, dof_vel = self.robot.get_default_dof_state(env_ids=env_ids)
self.robot.set_dof_state(dof_pos, dof_vel, env_ids=env_ids)
# -- object pose
self._randomize_object_initial_pose(env_ids=env_ids, cfg=self.cfg.randomization.object_initial_pose)
# -- goal pose
self._randomize_object_desired_pose(env_ids=env_ids, cfg=self.cfg.randomization.object_desired_pose)
# -- Reward logging
# fill extras with episode information
self.extras["episode"] = dict()
# reset
# -- rewards manager: fills the sums for terminated episodes
self._reward_manager.reset_idx(env_ids, self.extras["episode"])
# -- obs manager
self._observation_manager.reset_idx(env_ids)
# -- reset history
self.previous_actions[env_ids] = 0
# -- MDP reset
self.reset_buf[env_ids] = 0
self.episode_length_buf[env_ids] = 0
# controller reset
if self.cfg.control.control_type == "inverse_kinematics":
self._ik_controller.reset_idx(env_ids)
def _step_impl(self, actions: torch.Tensor):
# reset environments that terminated
reset_env_ids = self.reset_buf.nonzero(as_tuple=False).squeeze(-1)
if len(reset_env_ids) > 0:
self._reset_idx(reset_env_ids)
# increment the number of steps
self.episode_length_buf += 1
# pre-step: set actions into buffer
self.actions = actions.clone().to(device=self.device)
# transform actions based on controller
if self.cfg.control.control_type == "inverse_kinematics":
# set the controller commands
self._ik_controller.set_command(self.actions[:, :-1])
# use IK to convert to joint-space commands
self.robot_actions[:, : self.robot.arm_num_dof] = self._ik_controller.compute(
self.robot.data.ee_state_w[:, 0:3] - self.envs_positions,
self.robot.data.ee_state_w[:, 3:7],
self.robot.data.ee_jacobian,
self.robot.data.arm_dof_pos,
)
# offset actuator command with position offsets
dof_pos_offset = self.robot.data.actuator_pos_offset
self.robot_actions[:, : self.robot.arm_num_dof] -= dof_pos_offset[:, : self.robot.arm_num_dof]
# we assume last command is tool action so don't change that
self.robot_actions[:, -1] = self.actions[:, -1]
elif self.cfg.control.control_type == "default":
self.robot_actions[:] = self.actions
# perform physics stepping
for _ in range(self.cfg.control.decimation):
# set actions into buffers
self.robot.apply_action(self.robot_actions)
# simulate
self.sim.step(render=self.enable_render)
# check that simulation is playing
if self.sim.is_stopped():
return
# post-step:
# -- compute common buffers
self.robot.update_buffers(self.dt)
self.object.update_buffers(self.dt)
# -- compute MDP signals
# reward
self.reward_buf = self._reward_manager.compute()
# terminations
self._check_termination()
# -- store history
self.previous_actions = self.actions.clone()
# -- add information to extra if timeout occurred due to episode length
# Note: this is used by algorithms like PPO where time-outs are handled differently
self.extras["time_outs"] = self.episode_length_buf >= self.max_episode_length
# -- add information to extra if task completed
object_position_error = torch.norm(self.object.data.root_pos_w - self.object_des_pose_w[:, 0:3], dim=1)
self.extras["is_success"] = torch.where(object_position_error < 0.02, 1, self.reset_buf)
# -- update USD visualization
if self.cfg.viewer.debug_vis and self.enable_render:
self._debug_vis()
def _get_observations(self) -> VecEnvObs:
# compute observations
return self._observation_manager.compute()
"""
Helper functions - Scene handling.
"""
def _pre_process_cfg(self) -> None:
"""Pre-processing of configuration parameters."""
# set configuration for task-space controller
if self.cfg.control.control_type == "inverse_kinematics":
print("Using inverse kinematics controller...")
# enable jacobian computation
self.cfg.robot.data_info.enable_jacobian = True
# enable gravity compensation
self.cfg.robot.rigid_props.disable_gravity = True
# set the end-effector offsets
self.cfg.control.inverse_kinematics.position_offset = self.cfg.robot.ee_info.pos_offset
self.cfg.control.inverse_kinematics.rotation_offset = self.cfg.robot.ee_info.rot_offset
else:
print("Using default joint controller...")
def _process_cfg(self) -> None:
"""Post processing of configuration parameters."""
# compute constants for environment
self.dt = self.cfg.control.decimation * self.physics_dt # control-dt
self.max_episode_length = math.ceil(self.cfg.env.episode_length_s / self.dt)
# convert configuration parameters to torchee
# randomization
# -- initial pose
config = self.cfg.randomization.object_initial_pose
for attr in ["position_uniform_min", "position_uniform_max"]:
setattr(config, attr, torch.tensor(getattr(config, attr), device=self.device, requires_grad=False))
# -- desired pose
config = self.cfg.randomization.object_desired_pose
for attr in ["position_uniform_min", "position_uniform_max", "position_default", "orientation_default"]:
setattr(config, attr, torch.tensor(getattr(config, attr), device=self.device, requires_grad=False))
def _initialize_views(self) -> None:
"""Creates views and extract useful quantities from them."""
# play the simulator to activate physics handles
# note: this activates the physics simulation view that exposes TensorAPIs
self.sim.reset()
# define views over instances
self.robot.initialize(self.env_ns + "/.*/Robot")
self.object.initialize(self.env_ns + "/.*/Object")
# create controller
if self.cfg.control.control_type == "inverse_kinematics":
self._ik_controller = DifferentialInverseKinematics(
self.cfg.control.inverse_kinematics, self.robot.count, self.device
)
self.num_actions = self._ik_controller.num_actions + 1
elif self.cfg.control.control_type == "default":
self.num_actions = self.robot.num_actions
# history
self.actions = torch.zeros((self.num_envs, self.num_actions), device=self.device)
self.previous_actions = torch.zeros((self.num_envs, self.num_actions), device=self.device)
# robot joint actions
self.robot_actions = torch.zeros((self.num_envs, self.robot.num_actions), device=self.device)
# commands
self.object_des_pose_w = torch.zeros((self.num_envs, 7), device=self.device)
# buffers
self.object_root_pose_ee = torch.zeros((self.num_envs, 7), device=self.device)
# time-step = 0
self.object_init_pose_w = torch.zeros((self.num_envs, 7), device=self.device)
def _debug_vis(self):
"""Visualize the environment in debug mode."""
# apply to instance manager
# -- goal
self._goal_markers.set_world_poses(self.object_des_pose_w[:, 0:3], self.object_des_pose_w[:, 3:7])
# -- end-effector
self._ee_markers.set_world_poses(self.robot.data.ee_state_w[:, 0:3], self.robot.data.ee_state_w[:, 3:7])
# -- task-space commands
if self.cfg.control.control_type == "inverse_kinematics":
# convert to world frame
ee_positions = self._ik_controller.desired_ee_pos + self.envs_positions
ee_orientations = self._ik_controller.desired_ee_rot
# set poses
self._cmd_markers.set_world_poses(ee_positions, ee_orientations)
"""
Helper functions - MDP.
"""
def _check_termination(self) -> None:
# access buffers from simulator
object_pos = self.object.data.root_pos_w - self.envs_positions
# extract values from buffer
self.reset_buf[:] = 0
# compute resets
# -- when task is successful
if self.cfg.terminations.is_success:
object_position_error = torch.norm(self.object.data.root_pos_w - self.object_des_pose_w[:, 0:3], dim=1)
self.reset_buf = torch.where(object_position_error < 0.02, 1, self.reset_buf)
# -- object fell off the table (table at height: 0.0 m)
if self.cfg.terminations.object_falling:
self.reset_buf = torch.where(object_pos[:, 2] < -0.05, 1, self.reset_buf)
# -- episode length
if self.cfg.terminations.episode_timeout:
self.reset_buf = torch.where(self.episode_length_buf >= self.max_episode_length, 1, self.reset_buf)
def _randomize_object_initial_pose(self, env_ids: torch.Tensor, cfg: RandomizationCfg.ObjectInitialPoseCfg):
"""Randomize the initial pose of the object."""
# get the default root state
root_state = self.object.get_default_root_state(env_ids)
# -- object root position
if cfg.position_cat == "default":
pass
elif cfg.position_cat == "uniform":
# sample uniformly from box
# note: this should be within in the workspace of the robot
root_state[:, 0:3] = sample_uniform(
cfg.position_uniform_min, cfg.position_uniform_max, (len(env_ids), 3), device=self.device
)
else:
raise ValueError(f"Invalid category for randomizing the object positions '{cfg.position_cat}'.")
# -- object root orientation
if cfg.orientation_cat == "default":
pass
elif cfg.orientation_cat == "uniform":
# sample uniformly in SO(3)
root_state[:, 3:7] = random_orientation(len(env_ids), self.device)
else:
raise ValueError(f"Invalid category for randomizing the object orientation '{cfg.orientation_cat}'.")
# transform command from local env to world
root_state[:, 0:3] += self.envs_positions[env_ids]
# update object init pose
self.object_init_pose_w[env_ids] = root_state[:, 0:7]
# set the root state
self.object.set_root_state(root_state, env_ids=env_ids)
def _randomize_object_desired_pose(self, env_ids: torch.Tensor, cfg: RandomizationCfg.ObjectDesiredPoseCfg):
"""Randomize the desired pose of the object."""
# -- desired object root position
if cfg.position_cat == "default":
# constant command for position
self.object_des_pose_w[env_ids, 0:3] = cfg.position_default
elif cfg.position_cat == "uniform":
# sample uniformly from box
# note: this should be within in the workspace of the robot
self.object_des_pose_w[env_ids, 0:3] = sample_uniform(
cfg.position_uniform_min, cfg.position_uniform_max, (len(env_ids), 3), device=self.device
)
else:
raise ValueError(f"Invalid category for randomizing the desired object positions '{cfg.position_cat}'.")
# -- desired object root orientation
if cfg.orientation_cat == "default":
# constant position of the object
self.object_des_pose_w[env_ids, 3:7] = cfg.orientation_default
elif cfg.orientation_cat == "uniform":
self.object_des_pose_w[env_ids, 3:7] = random_orientation(len(env_ids), self.device)
else:
raise ValueError(
f"Invalid category for randomizing the desired object orientation '{cfg.orientation_cat}'."
)
# transform command from local env to world
self.object_des_pose_w[env_ids, 0:3] += self.envs_positions[env_ids]
class LiftObservationManager(ObservationManager):
"""Reward manager for single-arm reaching environment."""
def arm_dof_pos(self, env: LiftEnv):
"""DOF positions for the arm."""
return env.robot.data.arm_dof_pos
def arm_dof_pos_scaled(self, env: LiftEnv):
"""DOF positions for the arm normalized to its max and min ranges."""
return scale_transform(
env.robot.data.arm_dof_pos,
env.robot.data.soft_dof_pos_limits[:, : env.robot.arm_num_dof, 0],
env.robot.data.soft_dof_pos_limits[:, : env.robot.arm_num_dof, 1],
)
def arm_dof_vel(self, env: LiftEnv):
"""DOF velocity of the arm."""
return env.robot.data.arm_dof_vel
def tool_dof_pos_scaled(self, env: LiftEnv):
"""DOF positions of the tool normalized to its max and min ranges."""
return scale_transform(
env.robot.data.tool_dof_pos,
env.robot.data.soft_dof_pos_limits[:, env.robot.arm_num_dof :, 0],
env.robot.data.soft_dof_pos_limits[:, env.robot.arm_num_dof :, 1],
)
def tool_positions(self, env: LiftEnv):
"""Current end-effector position of the arm."""
return env.robot.data.ee_state_w[:, :3] - env.envs_positions
def tool_orientations(self, env: LiftEnv):
"""Current end-effector orientation of the arm."""
# make the first element positive
quat_w = env.robot.data.ee_state_w[:, 3:7]
quat_w[quat_w[:, 0] < 0] *= -1
return quat_w
def object_positions(self, env: LiftEnv):
"""Current object position."""
return env.object.data.root_pos_w - env.envs_positions
def object_orientations(self, env: LiftEnv):
"""Current object orientation."""
# make the first element positive
quat_w = env.object.data.root_quat_w
quat_w[quat_w[:, 0] < 0] *= -1
return quat_w
def object_relative_tool_positions(self, env: LiftEnv):
"""Current object position w.r.t. end-effector frame."""
return env.object.data.root_pos_w - env.robot.data.ee_state_w[:, :3]
def object_relative_tool_orientations(self, env: LiftEnv):
"""Current object orientation w.r.t. end-effector frame."""
# compute the relative orientation
quat_ee = quat_mul(quat_inv(env.robot.data.ee_state_w[:, 3:7]), env.object.data.root_quat_w)
# make the first element positive
quat_ee[quat_ee[:, 0] < 0] *= -1
return quat_ee
def object_desired_positions(self, env: LiftEnv):
"""Desired object position."""
return env.object_des_pose_w[:, 0:3] - env.envs_positions
def object_desired_orientations(self, env: LiftEnv):
"""Desired object orientation."""
# make the first element positive
quat_w = env.object_des_pose_w[:, 3:7]
quat_w[quat_w[:, 0] < 0] *= -1
return quat_w
def arm_actions(self, env: LiftEnv):
"""Last arm actions provided to env."""
return env.actions[:, :-1]
def tool_actions(self, env: LiftEnv):
"""Last tool actions provided to env."""
return env.actions[:, -1].unsqueeze(1)
def tool_actions_bool(self, env: LiftEnv):
"""Last tool actions transformed to a boolean command."""
return torch.sign(env.actions[:, -1]).unsqueeze(1)
class LiftRewardManager(RewardManager):
"""Reward manager for single-arm object lifting environment."""
def reaching_object_position_l2(self, env: LiftEnv):
"""Penalize end-effector tracking position error using L2-kernel."""
return torch.sum(torch.square(env.robot.data.ee_state_w[:, 0:3] - env.object.data.root_pos_w), dim=1)
def reaching_object_position_exp(self, env: LiftEnv, sigma: float):
"""Penalize end-effector tracking position error using exp-kernel."""
error = torch.sum(torch.square(env.robot.data.ee_state_w[:, 0:3] - env.object.data.root_pos_w), dim=1)
return torch.exp(-error / sigma)
def reaching_object_position_tanh(self, env: LiftEnv, sigma: float):
"""Penalize tool sites tracking position error using tanh-kernel."""
# distance of end-effector to the object: (num_envs,)
ee_distance = torch.norm(env.robot.data.ee_state_w[:, 0:3] - env.object.data.root_pos_w, dim=1)
# distance of the tool sites to the object: (num_envs, num_tool_sites)
object_root_pos = env.object.data.root_pos_w.unsqueeze(1) # (num_envs, 1, 3)
tool_sites_distance = torch.norm(env.robot.data.tool_sites_state_w[:, :, :3] - object_root_pos, dim=-1)
# average distance of the tool sites to the object: (num_envs,)
# note: we add the ee distance to the average to make sure that the ee is always closer to the object
num_tool_sites = tool_sites_distance.shape[1]
average_distance = (ee_distance + torch.sum(tool_sites_distance, dim=1)) / (num_tool_sites + 1)
return 1 - torch.tanh(average_distance / sigma)
def penalizing_arm_dof_velocity_l2(self, env: LiftEnv):
"""Penalize large movements of the robot arm."""
return -torch.sum(torch.square(env.robot.data.arm_dof_vel), dim=1)
def penalizing_tool_dof_velocity_l2(self, env: LiftEnv):
"""Penalize large movements of the robot tool."""
return -torch.sum(torch.square(env.robot.data.tool_dof_vel), dim=1)
def penalizing_arm_action_rate_l2(self, env: LiftEnv):
"""Penalize large variations in action commands besides tool."""
return -torch.sum(torch.square(env.actions[:, :-1] - env.previous_actions[:, :-1]), dim=1)
def penalizing_tool_action_l2(self, env: LiftEnv):
"""Penalize large values in action commands for the tool."""
return -torch.square(env.actions[:, -1])
def tracking_object_position_exp(self, env: LiftEnv, sigma: float, threshold: float):
"""Penalize tracking object position error using exp-kernel."""
# distance of the end-effector to the object: (num_envs,)
error = torch.sum(torch.square(env.object_des_pose_w[:, 0:3] - env.object.data.root_pos_w), dim=1)
# rewarded if the object is lifted above the threshold
return (env.object.data.root_pos_w[:, 2] > threshold) * torch.exp(-error / sigma)
def tracking_object_position_tanh(self, env: LiftEnv, sigma: float, threshold: float):
"""Penalize tracking object position error using tanh-kernel."""
# distance of the end-effector to the object: (num_envs,)
distance = torch.norm(env.object_des_pose_w[:, 0:3] - env.object.data.root_pos_w, dim=1)
# rewarded if the object is lifted above the threshold
return (env.object.data.root_pos_w[:, 2] > threshold) * (1 - torch.tanh(distance / sigma))
def lifting_object_success(self, env: LiftEnv, threshold: float):
"""Sparse reward if object is lifted successfully."""
return torch.where(env.object.data.root_pos_w[:, 2] > threshold, 1.0, 0.0)
# Copyright (c) 2022-2023, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2022-2023, The ORBIT Project Developers.
# All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
from dataclasses import MISSING
import omni.isaac.orbit.sim as sim_utils
from omni.isaac.orbit.assets import ArticulationCfg, AssetBaseCfg, RigidObjectCfg
from omni.isaac.orbit.command_generators.command_generator_cfg import UniformPoseCommandGeneratorCfg
from omni.isaac.orbit.envs import RLTaskEnvCfg
from omni.isaac.orbit.managers import CurriculumTermCfg as CurrTerm
from omni.isaac.orbit.managers import ObservationGroupCfg as ObsGroup
from omni.isaac.orbit.managers import ObservationTermCfg as ObsTerm
from omni.isaac.orbit.managers import RandomizationTermCfg as RandTerm
from omni.isaac.orbit.managers import RewardTermCfg as RewTerm
from omni.isaac.orbit.managers import SceneEntityCfg
from omni.isaac.orbit.managers import TerminationTermCfg as DoneTerm
from omni.isaac.orbit.scene import InteractiveSceneCfg
from omni.isaac.orbit.sensors.frame_transformer.frame_transformer_cfg import FrameTransformerCfg
from omni.isaac.orbit.sim.spawners.from_files.from_files_cfg import GroundPlaneCfg, UsdFileCfg
from omni.isaac.orbit.utils import configclass
from omni.isaac.orbit.utils.assets import ISAAC_NUCLEUS_DIR
from . import mdp
##
# Scene definition
##
@configclass
class ObjectTableSceneCfg(InteractiveSceneCfg):
"""Configuration for the lift scene with a robot and a object.
This is the abstract base implementation, the exact scene is defined in the derived classes
which need to set the target object, robot and end-effector frames
"""
# robots: will be populated by agent env cfg
robot: ArticulationCfg = MISSING
# end-effector sensor: will be populated by agent env cfg
ee_frame: FrameTransformerCfg = MISSING
# target object: will be populated by agent env cfg
object: RigidObjectCfg = MISSING
# Table
table = AssetBaseCfg(
prim_path="{ENV_REGEX_NS}/Table",
init_state=AssetBaseCfg.InitialStateCfg(pos=[0.5, 0, 0], rot=[0.707, 0, 0, 0.707]),
spawn=UsdFileCfg(usd_path=f"{ISAAC_NUCLEUS_DIR}/Props/Mounts/SeattleLabTable/table_instanceable.usd"),
)
# plane
plane = AssetBaseCfg(
prim_path="/World/GroundPlane",
init_state=AssetBaseCfg.InitialStateCfg(pos=[0, 0, -1.05]),
spawn=GroundPlaneCfg(),
)
# lights
light = AssetBaseCfg(
prim_path="/World/light",
spawn=sim_utils.DomeLightCfg(color=(0.75, 0.75, 0.75), intensity=3000.0),
)
##
# MDP settings
##
@configclass
class ActionsCfg:
"""Action specifications for the MDP."""
# will be set by agent env cfg
body_joint_pos: mdp.JointPositionActionCfg = MISSING
finger_joint_pos: mdp.BinaryJointPositionActionCfg = MISSING
@configclass
class ObservationsCfg:
"""Observation specifications for the MDP."""
@configclass
class PolicyCfg(ObsGroup):
"""Observations for policy group."""
joint_pos = ObsTerm(func=mdp.joint_pos_rel)
joint_vel = ObsTerm(func=mdp.joint_vel_rel)
object_position = ObsTerm(func=mdp.object_position_in_robot_root_frame)
target_object_position = ObsTerm(func=mdp.generated_commands)
actions = ObsTerm(func=mdp.last_action)
def __post_init__(self):
self.enable_corruption = True
self.concatenate_terms = True
# observation groups
policy: PolicyCfg = PolicyCfg()
@configclass
class RandomizationCfg:
"""Configuration for randomization."""
reset_all = RandTerm(func=mdp.reset_scene_to_default, mode="reset")
reset_object_position = RandTerm(
func=mdp.reset_root_state_uniform,
mode="reset",
params={
"pose_range": {"x": (-0.1, 0.1), "y": (-0.25, 0.25), "z": (0.0, 0.0)},
"velocity_range": {},
"asset_cfg": SceneEntityCfg("object", body_names="Object"),
},
)
@configclass
class RewardsCfg:
"""Reward terms for the MDP."""
reaching_object = RewTerm(func=mdp.object_ee_distance, params={"std": 0.1}, weight=1.0)
lifting_object = RewTerm(func=mdp.object_is_lifted, params={"minimal_height": 0.06}, weight=15.0)
object_goal_tracking = RewTerm(
func=mdp.object_goal_distance,
params={"std": 0.3, "minimal_height": 0.06},
weight=16.0,
)
object_goal_tracking_fine_grained = RewTerm(
func=mdp.object_goal_distance,
params={"std": 0.05, "minimal_height": 0.06},
weight=5.0,
)
# action penalty
action_rate = RewTerm(func=mdp.action_rate_l2, weight=-1e-3)
joint_vel = RewTerm(
func=mdp.joint_vel_l2,
weight=-1e-4,
params={"asset_cfg": SceneEntityCfg("robot")},
)
@configclass
class TerminationsCfg:
"""Termination terms for the MDP."""
time_out = DoneTerm(func=mdp.time_out, time_out=True)
object_dropping = DoneTerm(
func=mdp.base_height, params={"minimum_height": -0.05, "asset_cfg": SceneEntityCfg("object")}
)
@configclass
class CurriculumCfg:
"""Curriculum terms for the MDP."""
action_rate = CurrTerm(
func=mdp.modify_reward_weight, params={"term_name": "action_rate", "weight": -1e-1, "num_steps": 10000}
)
joint_vel = CurrTerm(
func=mdp.modify_reward_weight, params={"term_name": "joint_vel", "weight": -1e-1, "num_steps": 10000}
)
##
# Environment configuration
##
@configclass
class LiftEnvCfg(RLTaskEnvCfg):
"""Configuration for the lifting environment."""
# Scene settings
scene: ObjectTableSceneCfg = ObjectTableSceneCfg(num_envs=4096, env_spacing=2.5, replicate_physics=False)
# Basic settings
observations: ObservationsCfg = ObservationsCfg()
actions: ActionsCfg = ActionsCfg()
commands: UniformPoseCommandGeneratorCfg = UniformPoseCommandGeneratorCfg(
asset_name="robot",
body_name=MISSING, # will be set by agent env cfg
resampling_time_range=(5.0, 5.0),
debug_vis=True,
ranges=UniformPoseCommandGeneratorCfg.Ranges(
pos_x=(0.4, 0.6), pos_y=(-0.25, 0.25), pos_z=(0.25, 0.5), roll=(0.0, 0.0), pitch=(0.0, 0.0), yaw=(0.0, 0.0)
),
)
# MDP settings
rewards: RewardsCfg = RewardsCfg()
terminations: TerminationsCfg = TerminationsCfg()
randomization: RandomizationCfg = RandomizationCfg()
curriculum: CurriculumCfg = CurriculumCfg()
def __post_init__(self):
"""Post initialization."""
# general settings
self.decimation = 2
self.episode_length_s = 5.0
# simulation settings
self.sim.dt = 0.01 # 100Hz
self.sim.physx.bounce_threshold_velocity = 0.2
self.sim.physx.bounce_threshold_velocity = 0.01
self.sim.physx.gpu_found_lost_aggregate_pairs_capacity = 1024 * 1024 * 4
self.sim.physx.gpu_total_aggregate_pairs_capacity = 16 * 1024
self.sim.physx.friction_correlation_distance = 0.00625
# Copyright (c) 2022-2023, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""This sub-module contains the functions that are specific to the lift environments."""
from omni.isaac.orbit.envs.mdp import * # noqa: F401, F403
from .observations import * # noqa: F401, F403
from .rewards import * # noqa: F401, F403
# Copyright (c) 2022-2023, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import torch
from typing import TYPE_CHECKING
from omni.isaac.orbit.assets import RigidObject
from omni.isaac.orbit.managers import SceneEntityCfg
from omni.isaac.orbit.utils.math import subtract_frame_transforms
if TYPE_CHECKING:
from omni.isaac.orbit.envs import RLTaskEnv
def object_position_in_robot_root_frame(
env: RLTaskEnv,
robot_cfg: SceneEntityCfg = SceneEntityCfg("robot"),
object_cfg: SceneEntityCfg = SceneEntityCfg("object"),
) -> torch.Tensor:
"""The position of the object in the robot's root frame."""
robot: RigidObject = env.scene[robot_cfg.name]
object: RigidObject = env.scene[object_cfg.name]
object_pos_w = object.data.root_pos_w[:, :3]
object_pos_b, _ = subtract_frame_transforms(
robot.data.root_state_w[:, :3], robot.data.root_state_w[:, 3:7], object_pos_w
)
return object_pos_b
# Copyright (c) 2022-2023, The ORBIT Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import torch
from typing import TYPE_CHECKING
from omni.isaac.orbit.assets import RigidObject
from omni.isaac.orbit.command_generators import UniformPoseCommandGenerator
from omni.isaac.orbit.managers import SceneEntityCfg
from omni.isaac.orbit.sensors import FrameTransformer
from omni.isaac.orbit.utils.math import combine_frame_transforms
if TYPE_CHECKING:
from omni.isaac.orbit.envs import RLTaskEnv
def object_is_lifted(
env: RLTaskEnv, minimal_height: float, object_cfg: SceneEntityCfg = SceneEntityCfg("object")
) -> torch.Tensor:
"""Reward the agent for lifting the object above the minimal height."""
object: RigidObject = env.scene[object_cfg.name]
return torch.where(object.data.root_pos_w[:, 2] > minimal_height, 1.0, 0.0)
def object_ee_distance(
env: RLTaskEnv,
std: float,
object_cfg: SceneEntityCfg = SceneEntityCfg("object"),
ee_frame_cfg: SceneEntityCfg = SceneEntityCfg("ee_frame"),
) -> torch.Tensor:
"""Reward the agent for reaching the object using tanh-kernel."""
# Target object position: (num_envs, 3)
object: RigidObject = env.scene[object_cfg.name]
ee_frame: FrameTransformer = env.scene[ee_frame_cfg.name]
cube_pos_w = object.data.root_pos_w
# End-effector position: (num_envs, 3)
ee_w = ee_frame.data.target_pos_w[..., 0, :]
# Distance of the end-effector to the object: (num_envs,)
object_ee_distance = torch.norm(cube_pos_w - ee_w, dim=1)
return 1 - torch.tanh(object_ee_distance / std)
def object_goal_distance(
env: RLTaskEnv,
std: float,
minimal_height: float,
robot_cfg: SceneEntityCfg = SceneEntityCfg("robot"),
object_cfg: SceneEntityCfg = SceneEntityCfg("object"),
) -> torch.Tensor:
"""Reward the agent for tracking the goal pose using tanh-kernel."""
robot: RigidObject = env.scene[robot_cfg.name]
object: RigidObject = env.scene[object_cfg.name]
command_manager: UniformPoseCommandGenerator = env.command_manager
des_pos_b = command_manager.command[:, :3]
des_pos_w, _ = combine_frame_transforms(robot.data.root_state_w[:, :3], robot.data.root_state_w[:, 3:7], des_pos_b)
# distance of the end-effector to the object: (num_envs,)
distance = torch.norm(des_pos_w - object.data.root_pos_w[:, :3], dim=1)
# rewarded if the object is lifted above the threshold
return (object.data.root_pos_w[:, 2] > minimal_height) * (1 - torch.tanh(distance / std))
......@@ -5,7 +5,7 @@
import gymnasium as gym
from . import agents, joint_env_cfg
from . import agents, env_cfg
##
# Register Gym environments.
......@@ -16,9 +16,9 @@ gym.register(
entry_point="omni.isaac.orbit.envs:RLTaskEnv",
disable_env_checker=True,
kwargs={
"env_cfg_entry_point": joint_env_cfg.FrankaReachEnvCfg,
"env_cfg_entry_point": env_cfg.FrankaReachEnvCfg,
"rl_games_cfg_entry_point": f"{agents.__name__}:rl_games_ppo_cfg.yaml",
"rsl_rl_cfg_entry_point": f"{agents.__name__}.rsl_rl_ppo_cfg:FrankaReachPPORunnerCfg",
"rsl_rl_cfg_entry_point": f"{agents.__name__}.rsl_rl_cfg:FrankaReachPPORunnerCfg",
},
)
......@@ -27,8 +27,8 @@ gym.register(
entry_point="omni.isaac.orbit.envs:RLTaskEnv",
disable_env_checker=True,
kwargs={
"env_cfg_entry_point": joint_env_cfg.FrankaReachEnvCfg_PLAY,
"env_cfg_entry_point": env_cfg.FrankaReachEnvCfg_PLAY,
"rl_games_cfg_entry_point": f"{agents.__name__}:rl_games_ppo_cfg.yaml",
"rsl_rl_cfg_entry_point": f"{agents.__name__}.rsl_rl_ppo_cfg:FrankaReachPPORunnerCfg",
"rsl_rl_cfg_entry_point": f"{agents.__name__}.rsl_rl_cfg:FrankaReachPPORunnerCfg",
},
)
......@@ -17,7 +17,7 @@ class FrankaReachPPORunnerCfg(RslRlOnPolicyRunnerCfg):
num_steps_per_env = 24
max_iterations = 1000
save_interval = 50
experiment_name = "reach_franka"
experiment_name = "franka_reach"
run_name = ""
resume = False
empirical_normalization = False
......
......@@ -5,7 +5,7 @@
import gymnasium as gym
from . import agents, joint_env_cfg
from . import agents, env_cfg
##
# Register Gym environments.
......@@ -16,7 +16,7 @@ gym.register(
entry_point="omni.isaac.orbit.envs:RLTaskEnv",
disable_env_checker=True,
kwargs={
"env_cfg_entry_point": joint_env_cfg.UR10ReachEnvCfg,
"env_cfg_entry_point": env_cfg.UR10ReachEnvCfg,
"rl_games_cfg_entry_point": f"{agents.__name__}:rl_games_ppo_cfg.yaml",
"rsl_rl_cfg_entry_point": f"{agents.__name__}.rsl_rl_ppo_cfg:UR10ReachPPORunnerCfg",
},
......@@ -27,7 +27,7 @@ gym.register(
entry_point="omni.isaac.orbit.envs:RLTaskEnv",
disable_env_checker=True,
kwargs={
"env_cfg_entry_point": joint_env_cfg.UR10ReachEnvCfg_PLAY,
"env_cfg_entry_point": env_cfg.UR10ReachEnvCfg_PLAY,
"rl_games_cfg_entry_point": f"{agents.__name__}:rl_games_ppo_cfg.yaml",
"rsl_rl_cfg_entry_point": f"{agents.__name__}.rsl_rl_ppo_cfg:UR10ReachPPORunnerCfg",
},
......
......@@ -126,7 +126,7 @@ class RewardsCfg:
)
# action penalty
action_rate = RewTerm(func=mdp.action_rate_l2, weight=-0.005)
action_rate = RewTerm(func=mdp.action_rate_l2, weight=-0.0001)
joint_vel = RewTerm(
func=mdp.joint_vel_l2,
weight=-0.0001,
......@@ -146,11 +146,7 @@ class CurriculumCfg:
"""Curriculum terms for the MDP."""
action_rate = CurrTerm(
func=mdp.modify_reward_weight, params={"term_name": "action_rate", "weight": -0.01, "num_steps": 4500}
)
joint_vel = CurrTerm(
func=mdp.modify_reward_weight, params={"term_name": "joint_vel", "weight": -0.0001, "num_steps": 4500}
func=mdp.modify_reward_weight, params={"term_name": "action_rate", "weight": -0.005, "num_steps": 4500}
)
......@@ -172,7 +168,7 @@ class ReachEnvCfg(RLTaskEnvCfg):
asset_name="robot",
body_name=MISSING,
resampling_time_range=(4.0, 4.0),
debug_vis=False,
debug_vis=True,
ranges=UniformPoseCommandGeneratorCfg.Ranges(
pos_x=(0.35, 0.65),
pos_y=(-0.2, 0.2),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment