Unverified Commit b8710f40 authored by Antoine RICHARD's avatar Antoine RICHARD Committed by GitHub

Managed environments actions / observations descriptions (#2730)

# Description

Experimental branch to generate observations and actions descriptions
from managed environments.

## Type of change

- New feature (non-breaking change which adds functionality)
- This change requires a documentation update

## Checklist

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./isaaclab.sh --format`
- [x] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [x] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there

---------
Signed-off-by: 's avatarooctipus <zhengyuz@nvidia.com>
Co-authored-by: 's avatarooctipus <zhengyuz@nvidia.com>
parent 00249505
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
actions:
- action_type: JointAction
clip: null
dtype: torch.float32
extras:
description: Joint action term that applies the processed actions to the articulation's
joints as position commands.
full_path: isaaclab.envs.mdp.actions.joint_actions.JointPositionAction
joint_names:
- LF_HAA
- LH_HAA
- RF_HAA
- RH_HAA
- LF_HFE
- LH_HFE
- RF_HFE
- RH_HFE
- LF_KFE
- LH_KFE
- RF_KFE
- RH_KFE
mdp_type: Action
name: joint_position_action
offset:
- 0.0
- 0.0
- 0.0
- 0.0
- 0.4000000059604645
- -0.4000000059604645
- 0.4000000059604645
- -0.4000000059604645
- -0.800000011920929
- 0.800000011920929
- -0.800000011920929
- 0.800000011920929
scale: 0.5
shape:
- 12
articulations:
robot:
default_joint_armature:
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
default_joint_damping:
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
default_joint_friction:
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
default_joint_pos:
- 0.0
- 0.0
- 0.0
- 0.0
- 0.4000000059604645
- -0.4000000059604645
- 0.4000000059604645
- -0.4000000059604645
- -0.800000011920929
- 0.800000011920929
- -0.800000011920929
- 0.800000011920929
default_joint_pos_limits:
- - -0.7853984236717224
- 0.6108654141426086
- - -0.7853984236717224
- 0.6108654141426086
- - -0.6108654141426086
- 0.7853984236717224
- - -0.6108654141426086
- 0.7853984236717224
- - -9.42477798461914
- 9.42477798461914
- - -9.42477798461914
- 9.42477798461914
- - -9.42477798461914
- 9.42477798461914
- - -9.42477798461914
- 9.42477798461914
- - -9.42477798461914
- 9.42477798461914
- - -9.42477798461914
- 9.42477798461914
- - -9.42477798461914
- 9.42477798461914
- - -9.42477798461914
- 9.42477798461914
default_joint_stiffness:
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
default_joint_vel:
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
joint_names:
- LF_HAA
- LH_HAA
- RF_HAA
- RH_HAA
- LF_HFE
- LH_HFE
- RF_HFE
- RH_HFE
- LF_KFE
- LH_KFE
- RF_KFE
- RH_KFE
observations:
policy:
- dtype: torch.float32
extras:
axes:
- X
- Y
- Z
description: Root linear velocity in the asset's root frame.
modifiers: null
units: m/s
full_path: isaaclab.envs.mdp.observations.base_lin_vel
mdp_type: Observation
name: base_lin_vel
observation_type: RootState
overloads:
clip: null
flatten_history_dim: true
history_length: 0
scale: null
shape:
- 3
- dtype: torch.float32
extras:
axes:
- X
- Y
- Z
description: Root angular velocity in the asset's root frame.
modifiers: null
units: rad/s
full_path: isaaclab.envs.mdp.observations.base_ang_vel
mdp_type: Observation
name: base_ang_vel
observation_type: RootState
overloads:
clip: null
flatten_history_dim: true
history_length: 0
scale: null
shape:
- 3
- dtype: torch.float32
extras:
axes:
- X
- Y
- Z
description: Gravity projection on the asset's root frame.
modifiers: null
units: m/s^2
full_path: isaaclab.envs.mdp.observations.projected_gravity
mdp_type: Observation
name: projected_gravity
observation_type: RootState
overloads:
clip: null
flatten_history_dim: true
history_length: 0
scale: null
shape:
- 3
- dtype: torch.float32
extras:
description: The generated command from command term in the command manager
with the given name.
modifiers: null
full_path: isaaclab.envs.mdp.observations.generated_commands
mdp_type: Observation
name: generated_commands
observation_type: Command
overloads:
clip: null
flatten_history_dim: true
history_length: 0
scale: null
shape:
- 3
- dtype: torch.float32
extras:
description: 'The joint positions of the asset w.r.t. the default joint positions.
Note: Only the joints configured in :attr:`asset_cfg.joint_ids` will have
their positions returned.'
modifiers: null
units: rad
full_path: isaaclab.envs.mdp.observations.joint_pos_rel
joint_names:
- LF_HAA
- LH_HAA
- RF_HAA
- RH_HAA
- LF_HFE
- LH_HFE
- RF_HFE
- RH_HFE
- LF_KFE
- LH_KFE
- RF_KFE
- RH_KFE
joint_pos_offsets:
- 0.0
- 0.0
- 0.0
- 0.0
- 0.4000000059604645
- -0.4000000059604645
- 0.4000000059604645
- -0.4000000059604645
- -0.800000011920929
- 0.800000011920929
- -0.800000011920929
- 0.800000011920929
mdp_type: Observation
name: joint_pos_rel
observation_type: JointState
overloads:
clip: null
flatten_history_dim: true
history_length: 0
scale: null
shape:
- 12
- dtype: torch.float32
extras:
description: 'The joint velocities of the asset w.r.t. the default joint velocities.
Note: Only the joints configured in :attr:`asset_cfg.joint_ids` will have
their velocities returned.'
modifiers: null
units: rad/s
full_path: isaaclab.envs.mdp.observations.joint_vel_rel
joint_names:
- LF_HAA
- LH_HAA
- RF_HAA
- RH_HAA
- LF_HFE
- LH_HFE
- RF_HFE
- RH_HFE
- LF_KFE
- LH_KFE
- RF_KFE
- RH_KFE
joint_vel_offsets:
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
mdp_type: Observation
name: joint_vel_rel
observation_type: JointState
overloads:
clip: null
flatten_history_dim: true
history_length: 0
scale: null
shape:
- 12
- dtype: torch.float32
extras:
description: The last input action to the environment. The name of the action
term for which the action is required. If None, the entire action tensor is
returned.
modifiers: null
full_path: isaaclab.envs.mdp.observations.last_action
mdp_type: Observation
name: last_action
observation_type: Action
overloads:
clip: null
flatten_history_dim: true
history_length: 0
scale: null
shape:
- 12
scene:
decimation: 4
dt: 0.02
physics_dt: 0.005
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
actions:
- action_type: JointAction
clip: null
dtype: torch.float32
extras:
description: Joint action term that applies the processed actions to the articulation's
joints as position commands.
full_path: isaaclab.envs.mdp.actions.joint_actions.JointPositionAction
joint_names:
- left_hip_pitch_joint
- right_hip_pitch_joint
- torso_joint
- left_hip_roll_joint
- right_hip_roll_joint
- left_shoulder_pitch_joint
- right_shoulder_pitch_joint
- left_hip_yaw_joint
- right_hip_yaw_joint
- left_shoulder_roll_joint
- right_shoulder_roll_joint
- left_knee_joint
- right_knee_joint
- left_shoulder_yaw_joint
- right_shoulder_yaw_joint
- left_ankle_pitch_joint
- right_ankle_pitch_joint
- left_elbow_pitch_joint
- right_elbow_pitch_joint
- left_ankle_roll_joint
- right_ankle_roll_joint
- left_elbow_roll_joint
- right_elbow_roll_joint
- left_five_joint
- left_three_joint
- left_zero_joint
- right_five_joint
- right_three_joint
- right_zero_joint
- left_six_joint
- left_four_joint
- left_one_joint
- right_six_joint
- right_four_joint
- right_one_joint
- left_two_joint
- right_two_joint
mdp_type: Action
name: joint_position_action
offset:
- -0.20000000298023224
- -0.20000000298023224
- 0.0
- 0.0
- 0.0
- 0.3499999940395355
- 0.3499999940395355
- 0.0
- 0.0
- 0.1599999964237213
- -0.1599999964237213
- 0.41999998688697815
- 0.41999998688697815
- 0.0
- 0.0
- -0.23000000417232513
- -0.23000000417232513
- 0.8700000047683716
- 0.8700000047683716
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 1.0
- 0.0
- 0.0
- -1.0
- 0.5199999809265137
- -0.5199999809265137
scale: 0.5
shape:
- 37
articulations:
robot:
default_joint_armature:
- 0.009999999776482582
- 0.009999999776482582
- 0.009999999776482582
- 0.009999999776482582
- 0.009999999776482582
- 0.009999999776482582
- 0.009999999776482582
- 0.009999999776482582
- 0.009999999776482582
- 0.009999999776482582
- 0.009999999776482582
- 0.009999999776482582
- 0.009999999776482582
- 0.009999999776482582
- 0.009999999776482582
- 0.009999999776482582
- 0.009999999776482582
- 0.009999999776482582
- 0.009999999776482582
- 0.009999999776482582
- 0.009999999776482582
- 0.009999999776482582
- 0.009999999776482582
- 0.0010000000474974513
- 0.0010000000474974513
- 0.0010000000474974513
- 0.0010000000474974513
- 0.0010000000474974513
- 0.0010000000474974513
- 0.0010000000474974513
- 0.0010000000474974513
- 0.0010000000474974513
- 0.0010000000474974513
- 0.0010000000474974513
- 0.0010000000474974513
- 0.0010000000474974513
- 0.0010000000474974513
default_joint_damping:
- 5.0
- 5.0
- 5.0
- 5.0
- 5.0
- 10.0
- 10.0
- 5.0
- 5.0
- 10.0
- 10.0
- 5.0
- 5.0
- 10.0
- 10.0
- 2.0
- 2.0
- 10.0
- 10.0
- 2.0
- 2.0
- 10.0
- 10.0
- 10.0
- 10.0
- 10.0
- 10.0
- 10.0
- 10.0
- 10.0
- 10.0
- 10.0
- 10.0
- 10.0
- 10.0
- 10.0
- 10.0
default_joint_friction:
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
default_joint_pos:
- -0.20000000298023224
- -0.20000000298023224
- 0.0
- 0.0
- 0.0
- 0.3499999940395355
- 0.3499999940395355
- 0.0
- 0.0
- 0.1599999964237213
- -0.1599999964237213
- 0.41999998688697815
- 0.41999998688697815
- 0.0
- 0.0
- -0.23000000417232513
- -0.23000000417232513
- 0.8700000047683716
- 0.8700000047683716
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 1.0
- 0.0
- 0.0
- -1.0
- 0.5199999809265137
- -0.5199999809265137
default_joint_pos_limits:
- - -2.3499996662139893
- 3.049999952316284
- - -2.3499996662139893
- 3.049999952316284
- - -2.618000030517578
- 2.618000030517578
- - -0.25999996066093445
- 2.5299997329711914
- - -2.5299997329711914
- 0.25999996066093445
- - -2.967099666595459
- 2.7924997806549072
- - -2.967099666595459
- 2.7924997806549072
- - -2.749999761581421
- 2.749999761581421
- - -2.749999761581421
- 2.749999761581421
- - -1.5881999731063843
- 2.251499652862549
- - -2.251499652862549
- 1.5881999731063843
- - -0.3348899781703949
- 2.5448997020721436
- - -0.3348899781703949
- 2.5448997020721436
- - -2.618000030517578
- 2.618000030517578
- - -2.618000030517578
- 2.618000030517578
- - -0.6799999475479126
- 0.7299999594688416
- - -0.6799999475479126
- 0.7299999594688416
- - -0.22679997980594635
- 3.420799732208252
- - -0.22679997980594635
- 3.420799732208252
- - -0.26179996132850647
- 0.26179996132850647
- - -0.26179996132850647
- 0.26179996132850647
- - -2.094299793243408
- 2.094299793243408
- - -2.094299793243408
- 2.094299793243408
- - -1.8399999141693115
- 0.30000001192092896
- - -1.8399999141693115
- 0.30000001192092896
- - -0.5235979557037354
- 0.5235979557037354
- - -0.30000001192092896
- 1.8399999141693115
- - -0.30000001192092896
- 1.8399999141693115
- - -0.5235979557037354
- 0.5235979557037354
- - -1.8399999141693115
- 0.0
- - -1.8399999141693115
- 0.0
- - -0.9999999403953552
- 1.2000000476837158
- - 0.0
- 1.8399999141693115
- - 0.0
- 1.8399999141693115
- - -1.2000000476837158
- 0.9999999403953552
- - 0.0
- 1.8399999141693115
- - -1.8399999141693115
- 0.0
default_joint_stiffness:
- 200.0
- 200.0
- 200.0
- 150.0
- 150.0
- 40.0
- 40.0
- 150.0
- 150.0
- 40.0
- 40.0
- 200.0
- 200.0
- 40.0
- 40.0
- 20.0
- 20.0
- 40.0
- 40.0
- 20.0
- 20.0
- 40.0
- 40.0
- 40.0
- 40.0
- 40.0
- 40.0
- 40.0
- 40.0
- 40.0
- 40.0
- 40.0
- 40.0
- 40.0
- 40.0
- 40.0
- 40.0
default_joint_vel:
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
joint_names:
- left_hip_pitch_joint
- right_hip_pitch_joint
- torso_joint
- left_hip_roll_joint
- right_hip_roll_joint
- left_shoulder_pitch_joint
- right_shoulder_pitch_joint
- left_hip_yaw_joint
- right_hip_yaw_joint
- left_shoulder_roll_joint
- right_shoulder_roll_joint
- left_knee_joint
- right_knee_joint
- left_shoulder_yaw_joint
- right_shoulder_yaw_joint
- left_ankle_pitch_joint
- right_ankle_pitch_joint
- left_elbow_pitch_joint
- right_elbow_pitch_joint
- left_ankle_roll_joint
- right_ankle_roll_joint
- left_elbow_roll_joint
- right_elbow_roll_joint
- left_five_joint
- left_three_joint
- left_zero_joint
- right_five_joint
- right_three_joint
- right_zero_joint
- left_six_joint
- left_four_joint
- left_one_joint
- right_six_joint
- right_four_joint
- right_one_joint
- left_two_joint
- right_two_joint
observations:
policy:
- dtype: torch.float32
extras:
axes:
- X
- Y
- Z
description: Root linear velocity in the asset's root frame.
modifiers: null
units: m/s
full_path: isaaclab.envs.mdp.observations.base_lin_vel
mdp_type: Observation
name: base_lin_vel
observation_type: RootState
overloads:
clip: null
flatten_history_dim: true
history_length: 0
scale: null
shape:
- 3
- dtype: torch.float32
extras:
axes:
- X
- Y
- Z
description: Root angular velocity in the asset's root frame.
modifiers: null
units: rad/s
full_path: isaaclab.envs.mdp.observations.base_ang_vel
mdp_type: Observation
name: base_ang_vel
observation_type: RootState
overloads:
clip: null
flatten_history_dim: true
history_length: 0
scale: null
shape:
- 3
- dtype: torch.float32
extras:
axes:
- X
- Y
- Z
description: Gravity projection on the asset's root frame.
modifiers: null
units: m/s^2
full_path: isaaclab.envs.mdp.observations.projected_gravity
mdp_type: Observation
name: projected_gravity
observation_type: RootState
overloads:
clip: null
flatten_history_dim: true
history_length: 0
scale: null
shape:
- 3
- dtype: torch.float32
extras:
description: The generated command from command term in the command manager
with the given name.
modifiers: null
full_path: isaaclab.envs.mdp.observations.generated_commands
mdp_type: Observation
name: generated_commands
observation_type: Command
overloads:
clip: null
flatten_history_dim: true
history_length: 0
scale: null
shape:
- 3
- dtype: torch.float32
extras:
description: 'The joint positions of the asset w.r.t. the default joint positions.
Note: Only the joints configured in :attr:`asset_cfg.joint_ids` will have
their positions returned.'
modifiers: null
units: rad
full_path: isaaclab.envs.mdp.observations.joint_pos_rel
joint_names:
- left_hip_pitch_joint
- right_hip_pitch_joint
- torso_joint
- left_hip_roll_joint
- right_hip_roll_joint
- left_shoulder_pitch_joint
- right_shoulder_pitch_joint
- left_hip_yaw_joint
- right_hip_yaw_joint
- left_shoulder_roll_joint
- right_shoulder_roll_joint
- left_knee_joint
- right_knee_joint
- left_shoulder_yaw_joint
- right_shoulder_yaw_joint
- left_ankle_pitch_joint
- right_ankle_pitch_joint
- left_elbow_pitch_joint
- right_elbow_pitch_joint
- left_ankle_roll_joint
- right_ankle_roll_joint
- left_elbow_roll_joint
- right_elbow_roll_joint
- left_five_joint
- left_three_joint
- left_zero_joint
- right_five_joint
- right_three_joint
- right_zero_joint
- left_six_joint
- left_four_joint
- left_one_joint
- right_six_joint
- right_four_joint
- right_one_joint
- left_two_joint
- right_two_joint
joint_pos_offsets:
- -0.20000000298023224
- -0.20000000298023224
- 0.0
- 0.0
- 0.0
- 0.3499999940395355
- 0.3499999940395355
- 0.0
- 0.0
- 0.1599999964237213
- -0.1599999964237213
- 0.41999998688697815
- 0.41999998688697815
- 0.0
- 0.0
- -0.23000000417232513
- -0.23000000417232513
- 0.8700000047683716
- 0.8700000047683716
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 1.0
- 0.0
- 0.0
- -1.0
- 0.5199999809265137
- -0.5199999809265137
mdp_type: Observation
name: joint_pos_rel
observation_type: JointState
overloads:
clip: null
flatten_history_dim: true
history_length: 0
scale: null
shape:
- 37
- dtype: torch.float32
extras:
description: 'The joint velocities of the asset w.r.t. the default joint velocities.
Note: Only the joints configured in :attr:`asset_cfg.joint_ids` will have
their velocities returned.'
modifiers: null
units: rad/s
full_path: isaaclab.envs.mdp.observations.joint_vel_rel
joint_names:
- left_hip_pitch_joint
- right_hip_pitch_joint
- torso_joint
- left_hip_roll_joint
- right_hip_roll_joint
- left_shoulder_pitch_joint
- right_shoulder_pitch_joint
- left_hip_yaw_joint
- right_hip_yaw_joint
- left_shoulder_roll_joint
- right_shoulder_roll_joint
- left_knee_joint
- right_knee_joint
- left_shoulder_yaw_joint
- right_shoulder_yaw_joint
- left_ankle_pitch_joint
- right_ankle_pitch_joint
- left_elbow_pitch_joint
- right_elbow_pitch_joint
- left_ankle_roll_joint
- right_ankle_roll_joint
- left_elbow_roll_joint
- right_elbow_roll_joint
- left_five_joint
- left_three_joint
- left_zero_joint
- right_five_joint
- right_three_joint
- right_zero_joint
- left_six_joint
- left_four_joint
- left_one_joint
- right_six_joint
- right_four_joint
- right_one_joint
- left_two_joint
- right_two_joint
joint_vel_offsets:
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
- 0.0
mdp_type: Observation
name: joint_vel_rel
observation_type: JointState
overloads:
clip: null
flatten_history_dim: true
history_length: 0
scale: null
shape:
- 37
- dtype: torch.float32
extras:
description: The last input action to the environment. The name of the action
term for which the action is required. If None, the entire action tensor is
returned.
modifiers: null
full_path: isaaclab.envs.mdp.observations.last_action
mdp_type: Observation
name: last_action
observation_type: Action
overloads:
clip: null
flatten_history_dim: true
history_length: 0
scale: null
shape:
- 37
scene:
decimation: 4
dt: 0.02
physics_dt: 0.005
IO Descriptors 101
==================
.. currentmodule:: isaaclab
In this tutorial, we will learn about IO descriptors, what they are, how to export them, and how to add them to
your environments. We will use the Anymal-D robot as an example to demonstrate how to export IO descriptors from
an environment, and use our own terms to demonstrate how to attach IO descriptors to custom action and observation terms.
What are IO Descriptors?
------------------------
Before we dive into IO descriptors, let's first understand what they are and how they can be useful.
IO descriptors are a way to describe the inputs and outputs of a policy trained using the ManagerBasedRLEnv in Isaac
Lab. In other words, they describe the action and observation terms of a policy. This description is used to generate
a YAML file that can be loaded in an external tool to run the policies without having to manually input the
configuration of the action and observation terms.
In addition to this the IO Descriptors provide the following information:
- The parameters of all the joints in the articulation.
- Some simulation parameters including the simulation time step, and the policy time step.
- For some action and observation terms, it provides the joint names or body names in the same order as they appear in the action/observation terms.
- For both the observation and action terms, it provides the terms in the exact same order as they appear in the managers. Making it easy to reconstruct them from the YAML file.
Here is an example of what the action part of the YAML generated from the IO descriptors looks like for the Anymal-D robot:
.. literalinclude:: ../../_static/policy_deployment/01_io_descriptors/isaac_velocity_flat_anymal_d_v0_IO_descriptors.yaml
:language: yaml
:lines: 1-39
Here is an example of what a portion of the observation part of the YAML generated from the IO descriptors looks like for the Anymal-D robot:
.. literalinclude:: ../../_static/policy_deployment/01_io_descriptors/isaac_velocity_flat_anymal_d_v0_IO_descriptors.yaml
:language: yaml
:lines: 158-199
.. literalinclude:: ../../_static/policy_deployment/01_io_descriptors/isaac_velocity_flat_anymal_d_v0_IO_descriptors.yaml
:language: yaml
:lines: 236-279
Something to note here is that both the action and observation terms are returned as list of dictionaries, and not a dictionary of dictionaries.
This is done to ensure the order of the terms is preserved. Hence, to retrieve the action or observation term, the users need to look for the
``name`` key in the dictionaries.
For example, in the following snippet, we are looking at the ``projected_gravity`` observation term. The ``name`` key is used to identify the term.
The ``full_path`` key is used to provide an explicit path to the function in Isaac Lab's source code that is used to compute this term. Some flags
like ``mdp_type`` and ``observation_type`` are also provided, these don't have any functional impact. They are here to inform the user that this is the
category this term belongs to.
.. literalinclude:: ../../_static/policy_deployment/01_io_descriptors/isaac_velocity_flat_anymal_d_v0_IO_descriptors.yaml
:language: yaml
:lines: 200-219
:emphasize-lines: 9, 11
Exporting IO Descriptors from an Environment
--------------------------------------------
In this section, we will cover how to export IO descriptors from an environment.
Keep in mind that this feature is only available to the manager based RL environments.
If a policy has already been trained using a given configuration, then the IO descriptors can be exported using:
.. code-block:: bash
./isaaclab.sh -p scripts/environments/export_io_descriptors.py --task <task_name> --output_dir <output_dir>
For example, if we want to export the IO descriptors for the Anymal-D robot, we can run:
.. code-block:: bash
./isaaclab.sh -p scripts/environments/export_io_descriptors.py --task Isaac-Velocity-Flat-Anymal-D-v0 --output_dir ./io_descriptors
When training a policy, it is also possible to request the IO descriptors to be exported at the beginning of the training.
This can be done by setting the ``export_io_descriptors`` flag in the command line.
.. code-block:: bash
./isaaclab.sh -p scripts/reinforcement_learning/rsl_rl/train.py --task Isaac-Velocity-Flat-Anymal-D-v0 --export_io_descriptors
./isaaclab.sh -p scripts/reinforcement_learning/sb3/train.py --task Isaac-Velocity-Flat-Anymal-D-v0 --export_io_descriptors
./isaaclab.sh -p scripts/reinforcement_learning/rl_games/train.py --task Isaac-Velocity-Flat-Anymal-D-v0 --export_io_descriptors
./isaaclab.sh -p scripts/reinforcement_learning/skrl/train.py --task Isaac-Velocity-Flat-Anymal-D-v0 --export_io_descriptors
Attaching IO Descriptors to Custom Observation Terms
----------------------------------------------------
In this section, we will cover how to attach IO descriptors to custom observation terms.
Let's take a look at how we can attach an IO descriptor to a simple observation term:
.. code-block:: python
@generic_io_descriptor(
units="m/s", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]
)
def base_lin_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""Root linear velocity in the asset's root frame."""
# extract the used quantities (to enable type-hinting)
asset: RigidObject = env.scene[asset_cfg.name]
return asset.data.root_lin_vel_b
Here, we are defining a custom observation term called ``base_lin_vel`` that computes the root linear velocity of the robot.
We are also attaching an IO descriptor to this term. The IO descriptor is defined using the ``@generic_io_descriptor`` decorator.
The ``@generic_io_descriptor`` decorator is a special decorator that is used to attach an IO descriptor to a custom observation term.
It takes arbitrary arguments that are used to describe the observation term, in this case we provide extra information that could be
useful for the end user:
- ``units``: The units of the observation term.
- ``axes``: The axes of the observation term.
- ``observation_type``: The type of the observation term.
You'll also notice that there is an ``on_inspect`` argument that is provided. This is a list of functions that are used to inspect the observation term.
In this case, we are using the ``record_shape`` and ``record_dtype`` functions to record the shape and dtype of the output of the observation term.
These functions are defined like so:
.. code-block:: python
def record_shape(output: torch.Tensor, descriptor: GenericObservationIODescriptor, **kwargs) -> None:
"""Record the shape of the output tensor.
Args:
output: The output tensor.
descriptor: The descriptor to record the shape to.
**kwargs: Additional keyword arguments.
"""
descriptor.shape = (output.shape[-1],)
def record_dtype(output: torch.Tensor, descriptor: GenericObservationIODescriptor, **kwargs) -> None:
"""Record the dtype of the output tensor.
Args:
output: The output tensor.
descriptor: The descriptor to record the dtype to.
**kwargs: Additional keyword arguments.
"""
descriptor.dtype = str(output.dtype)
They always take the output tensor of the observation term as the first argument, and the descriptor as the second argument.
In the ``kwargs`` all the inputs of the observation term are provided. In addition to the ``on_inspect`` functions, the decorator
will also call call some functions in the background to collect the ``name``, the ``description``, and the ``full_path`` of the
observation term. Note that adding this decorator does not change the signature of the observation term, so it can be used safely
with the observation manager!
Let us now take a look at a more complex example: getting the relative joint positions of the robot.
.. code-block:: python
@generic_io_descriptor(
observation_type="JointState",
on_inspect=[record_joint_names, record_dtype, record_shape, record_joint_pos_offsets],
units="rad",
)
def joint_pos_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""The joint positions of the asset w.r.t. the default joint positions.
Note: Only the joints configured in :attr:`asset_cfg.joint_ids` will have their positions returned.
"""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
return asset.data.joint_pos[:, asset_cfg.joint_ids] - asset.data.default_joint_pos[:, asset_cfg.joint_ids]
Similarly to the previous example, we are adding an IO descriptor to a custom observation term with a set of functions that probe the observation term.
To get the name of the joints we can write the following function:
.. code-block:: python
def record_joint_names(output: torch.Tensor, descriptor: GenericObservationIODescriptor, **kwargs) -> None:
"""Record the joint names of the output tensor.
Expects the `asset_cfg` keyword argument to be set.
Args:
output: The output tensor.
descriptor: The descriptor to record the joint names to.
**kwargs: Additional keyword arguments.
"""
asset: Articulation = kwargs["env"].scene[kwargs["asset_cfg"].name]
joint_ids = kwargs["asset_cfg"].joint_ids
if joint_ids == slice(None, None, None):
joint_ids = list(range(len(asset.joint_names)))
descriptor.joint_names = [asset.joint_names[i] for i in joint_ids]
Note that we can access all the inputs of the observation term in the ``kwargs`` dictionary. Hence we can access the ``asset_cfg``, which contains the
configuration of the articulation that the observation term is computed on.
To get the offsets, we can write the following function:
.. code-block:: python
def record_joint_pos_offsets(output: torch.Tensor, descriptor: GenericObservationIODescriptor, **kwargs):
"""Record the joint position offsets of the output tensor.
Expects the `asset_cfg` keyword argument to be set.
Args:
output: The output tensor.
descriptor: The descriptor to record the joint position offsets to.
**kwargs: Additional keyword arguments.
"""
asset: Articulation = kwargs["env"].scene[kwargs["asset_cfg"].name]
ids = kwargs["asset_cfg"].joint_ids
# Get the offsets of the joints for the first robot in the scene.
# This assumes that all robots have the same joint offsets.
descriptor.joint_pos_offsets = asset.data.default_joint_pos[:, ids][0]
With this in mind, you should now be able to attach an IO descriptor to your own custom observation terms! However, before
we close this tutorial, let's take a look at how we can attach an IO descriptor to a custom action term.
Attaching IO Descriptors to Custom Action Terms
-----------------------------------------------
In this section, we will cover how to attach IO descriptors to custom action terms. Action terms are classes that
inherit from the :class:`managers.ActionTerm` class. To add an IO descriptor to an action term, we need to expand
upon its :meth:`ActionTerm.IO_descriptor` property.
By default, the :meth:`ActionTerm.IO_descriptor` property returns the base descriptor and fills the following fields:
- ``name``: The name of the action term.
- ``full_path``: The full path of the action term.
- ``description``: The description of the action term.
- ``export``: Whether to export the action term.
.. code-block:: python
@property
def IO_descriptor(self) -> GenericActionIODescriptor:
"""The IO descriptor for the action term."""
self._IO_descriptor.name = re.sub(r"([a-z])([A-Z])", r"\1_\2", self.__class__.__name__).lower()
self._IO_descriptor.full_path = f"{self.__class__.__module__}.{self.__class__.__name__}"
self._IO_descriptor.description = " ".join(self.__class__.__doc__.split())
self._IO_descriptor.export = self.export_IO_descriptor
return self._IO_descriptor
To add more information to the descriptor, we need to override the :meth:`ActionTerm.IO_descriptor` property.
Let's take a look at an example on how to add the joint names, scale, offset, and clip to the descriptor.
.. code-block:: python
@property
def IO_descriptor(self) -> GenericActionIODescriptor:
"""The IO descriptor of the action term.
This descriptor is used to describe the action term of the joint action.
It adds the following information to the base descriptor:
- joint_names: The names of the joints.
- scale: The scale of the action term.
- offset: The offset of the action term.
- clip: The clip of the action term.
Returns:
The IO descriptor of the action term.
"""
super().IO_descriptor
self._IO_descriptor.shape = (self.action_dim,)
self._IO_descriptor.dtype = str(self.raw_actions.dtype)
self._IO_descriptor.action_type = "JointAction"
self._IO_descriptor.joint_names = self._joint_names
self._IO_descriptor.scale = self._scale
# This seems to be always [4xNum_joints] IDK why. Need to check.
if isinstance(self._offset, torch.Tensor):
self._IO_descriptor.offset = self._offset[0].detach().cpu().numpy().tolist()
else:
self._IO_descriptor.offset = self._offset
# FIXME: This is not correct. Add list support.
if self.cfg.clip is not None:
if isinstance(self._clip, torch.Tensor):
self._IO_descriptor.clip = self._clip[0].detach().cpu().numpy().tolist()
else:
self._IO_descriptor.clip = self._clip
else:
self._IO_descriptor.clip = None
return self._IO_descriptor
This is it! You should now be able to attach an IO descriptor to your own custom action terms which concludes this tutorial.
......@@ -9,3 +9,4 @@ Below, you’ll find detailed examples of various policies for training and depl
:maxdepth: 1
00_hover/hover_policy
01_io_descriptors/io_descriptors_101
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Script to an environment with random action agent."""
"""Launch Isaac Sim Simulator first."""
import argparse
import os
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Random agent for Isaac Lab environments.")
parser.add_argument("--task", type=str, default=None, help="Name of the task.")
parser.add_argument("--output_dir", type=str, default=None, help="Path to the output directory.")
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
args_cli.headless = True
# launch omniverse app
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
"""Rest everything follows."""
import gymnasium as gym
import torch
import isaaclab_tasks # noqa: F401
from isaaclab_tasks.utils import parse_env_cfg
# PLACEHOLDER: Extension template (do not remove this comment)
def main():
"""Random actions agent with Isaac Lab environment."""
# create environment configuration
env_cfg = parse_env_cfg(args_cli.task, device=args_cli.device, num_envs=1, use_fabric=True)
# create environment
env = gym.make(args_cli.task, cfg=env_cfg)
# print info (this is vectorized environment)
print(f"[INFO]: Gym observation space: {env.observation_space}")
print(f"[INFO]: Gym action space: {env.action_space}")
# reset environment
env.reset()
outs = env.unwrapped.get_IO_descriptors
out_observations = outs["observations"]
out_actions = outs["actions"]
out_articulations = outs["articulations"]
out_scene = outs["scene"]
# Make a yaml file with the output
import yaml
name = args_cli.task.lower().replace("-", "_")
name = name.replace(" ", "_")
if not os.path.exists(args_cli.output_dir):
os.makedirs(args_cli.output_dir)
with open(os.path.join(args_cli.output_dir, f"{name}_IO_descriptors.yaml"), "w") as f:
print(f"[INFO]: Exporting IO descriptors to {os.path.join(args_cli.output_dir, f'{name}_IO_descriptors.yaml')}")
yaml.safe_dump(outs, f)
for k in out_actions:
print(f"--- Action term: {k['name']} ---")
k.pop("name")
for k1, v1 in k.items():
print(f"{k1}: {v1}")
for obs_group_name, obs_group in out_observations.items():
print(f"--- Obs group: {obs_group_name} ---")
for k in obs_group:
print(f"--- Obs term: {k['name']} ---")
k.pop("name")
for k1, v1 in k.items():
print(f"{k1}: {v1}")
for articulation_name, articulation_data in out_articulations.items():
print(f"--- Articulation: {articulation_name} ---")
for k1, v1 in articulation_data.items():
print(f"{k1}: {v1}")
for k1, v1 in out_scene.items():
print(f"{k1}: {v1}")
env.step(torch.zeros(env.action_space.shape, device=env.unwrapped.device))
env.close()
if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()
......@@ -41,6 +41,7 @@ parser.add_argument(
const=True,
help="if toggled, this experiment will be tracked with Weights and Biases",
)
parser.add_argument("--export_io_descriptors", action="store_true", default=False, help="Export IO descriptors.")
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
......@@ -64,6 +65,7 @@ import os
import random
from datetime import datetime
import omni
from rl_games.common import env_configurations, vecenv
from rl_games.common.algo_observer import IsaacAlgoObserver
from rl_games.torch_runner import Runner
......@@ -147,6 +149,15 @@ def main(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg | DirectMARLEnvCfg, agen
clip_obs = agent_cfg["params"]["env"].get("clip_observations", math.inf)
clip_actions = agent_cfg["params"]["env"].get("clip_actions", math.inf)
# set the IO descriptors output directory if requested
if isinstance(env_cfg, ManagerBasedRLEnvCfg):
env_cfg.export_io_descriptors = args_cli.export_io_descriptors
env_cfg.io_descriptors_output_dir = os.path.join(log_root_path, log_dir)
else:
omni.log.warn(
"IO descriptors are only supported for manager based RL environments. No IO descriptors will be exported."
)
# create isaac environment
env = gym.make(args_cli.task, cfg=env_cfg, render_mode="rgb_array" if args_cli.video else None)
......
......@@ -31,6 +31,7 @@ parser.add_argument("--max_iterations", type=int, default=None, help="RL Policy
parser.add_argument(
"--distributed", action="store_true", default=False, help="Run training with multiple GPUs or nodes."
)
parser.add_argument("--export_io_descriptors", action="store_true", default=False, help="Export IO descriptors.")
# append RSL-RL cli arguments
cli_args.add_rsl_rl_args(parser)
# append AppLauncher cli args
......@@ -77,6 +78,7 @@ import os
import torch
from datetime import datetime
import omni
from rsl_rl.runners import OnPolicyRunner
from isaaclab.envs import (
......@@ -140,6 +142,15 @@ def main(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg | DirectMARLEnvCfg, agen
log_dir += f"_{agent_cfg.run_name}"
log_dir = os.path.join(log_root_path, log_dir)
# set the IO descriptors output directory if requested
if isinstance(env_cfg, ManagerBasedRLEnvCfg):
env_cfg.export_io_descriptors = args_cli.export_io_descriptors
env_cfg.io_descriptors_output_dir = log_dir
else:
omni.log.warn(
"IO descriptors are only supported for manager based RL environments. No IO descriptors will be exported."
)
# create isaac environment
env = gym.make(args_cli.task, cfg=env_cfg, render_mode="rgb_array" if args_cli.video else None)
......
......@@ -30,6 +30,7 @@ parser.add_argument("--seed", type=int, default=None, help="Seed used for the en
parser.add_argument("--log_interval", type=int, default=100_000, help="Log data every n timesteps.")
parser.add_argument("--checkpoint", type=str, default=None, help="Continue the training from checkpoint.")
parser.add_argument("--max_iterations", type=int, default=None, help="RL Policy training iterations.")
parser.add_argument("--export_io_descriptors", action="store_true", default=False, help="Export IO descriptors.")
parser.add_argument(
"--keep_all_info",
action="store_true",
......@@ -77,6 +78,7 @@ import os
import random
from datetime import datetime
import omni
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import CheckpointCallback, LogEveryNTimesteps
from stable_baselines3.common.vec_env import VecNormalize
......@@ -141,6 +143,15 @@ def main(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg | DirectMARLEnvCfg, agen
policy_arch = agent_cfg.pop("policy")
n_timesteps = agent_cfg.pop("n_timesteps")
# set the IO descriptors output directory if requested
if isinstance(env_cfg, ManagerBasedRLEnvCfg):
env_cfg.export_io_descriptors = args_cli.export_io_descriptors
env_cfg.io_descriptors_output_dir = log_dir
else:
omni.log.warn(
"IO descriptors are only supported for manager based RL environments. No IO descriptors will be exported."
)
# create isaac environment
env = gym.make(args_cli.task, cfg=env_cfg, render_mode="rgb_array" if args_cli.video else None)
......
......@@ -39,6 +39,7 @@ parser.add_argument(
)
parser.add_argument("--checkpoint", type=str, default=None, help="Path to model checkpoint to resume training.")
parser.add_argument("--max_iterations", type=int, default=None, help="RL Policy training iterations.")
parser.add_argument("--export_io_descriptors", action="store_true", default=False, help="Export IO descriptors.")
parser.add_argument(
"--ml_framework",
type=str,
......@@ -76,6 +77,7 @@ import os
import random
from datetime import datetime
import omni
import skrl
from packaging import version
......@@ -171,6 +173,15 @@ def main(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg | DirectMARLEnvCfg, agen
# get checkpoint path (to resume training)
resume_path = retrieve_file_path(args_cli.checkpoint) if args_cli.checkpoint else None
# set the IO descriptors output directory if requested
if isinstance(env_cfg, ManagerBasedRLEnvCfg):
env_cfg.export_io_descriptors = args_cli.export_io_descriptors
env_cfg.io_descriptors_output_dir = os.path.join(log_root_path, log_dir)
else:
omni.log.warn(
"IO descriptors are only supported for manager based RL environments. No IO descriptors will be exported."
)
# create isaac environment
env = gym.make(args_cli.task, cfg=env_cfg, render_mode="rgb_array" if args_cli.video else None)
......
Changelog
---------
0.45.2 (2025-08-18)
~~~~~~~~~~~~~~~~~~~
Added
^^^^^
* Added :meth:`~isaaclab.managers.ObservationManager.get_IO_descriptors` to export the IO descriptors for the observation manager.
* Added :meth:`~isaaclab.envs.ManagerBasedEnvCfg.io_descriptors_output_dir` to configure the directory to export the IO descriptors to.
* Added :meth:`~isaaclab.envs.ManagerBasedEnvCfg.export_io_descriptors` to toggle the export of the IO descriptors.
* Added the option to export the Observation and Action of the managed environments into a YAML file. This can be used to more easily
deploy policies trained in Isaac Lab.
0.45.1 (2025-08-16)
~~~~~~~~~~~~~~~~~~~
......
......@@ -24,6 +24,7 @@ from isaaclab.utils.timer import Timer
from .common import VecEnvObs
from .manager_based_env_cfg import ManagerBasedEnvCfg
from .ui import ViewportCameraController
from .utils.io_descriptors import export_articulations_data, export_scene_data
class ManagerBasedEnv:
......@@ -185,6 +186,10 @@ class ManagerBasedEnv:
# initialize observation buffers
self.obs_buf = {}
# export IO descriptors if requested
if self.cfg.export_io_descriptors:
self.export_IO_descriptors()
def __del__(self):
"""Cleanup for the environment."""
self.close()
......@@ -219,6 +224,46 @@ class ManagerBasedEnv:
"""The device on which the environment is running."""
return self.sim.device
@property
def get_IO_descriptors(self):
"""Get the IO descriptors for the environment.
Returns:
A dictionary with keys as the group names and values as the IO descriptors.
"""
return {
"observations": self.observation_manager.get_IO_descriptors,
"actions": self.action_manager.get_IO_descriptors,
"articulations": export_articulations_data(self),
"scene": export_scene_data(self),
}
def export_IO_descriptors(self, output_dir: str | None = None):
"""Export the IO descriptors for the environment.
Args:
output_dir: The directory to export the IO descriptors to.
"""
import os
import yaml
IO_descriptors = self.get_IO_descriptors
if output_dir is None:
output_dir = self.cfg.io_descriptors_output_dir
if output_dir is None:
raise ValueError(
"Output directory is not set. Please set the output directory using the `io_descriptors_output_dir`"
" configuration."
)
if not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
with open(os.path.join(output_dir, "IO_descriptors.yaml"), "w") as f:
print(f"[INFO]: Exporting IO descriptors to {os.path.join(output_dir, 'IO_descriptors.yaml')}")
yaml.safe_dump(IO_descriptors, f)
"""
Operations - Setup.
"""
......
......@@ -125,3 +125,9 @@ class ManagerBasedEnvCfg:
teleop_devices: DevicesCfg = field(default_factory=DevicesCfg)
"""Configuration for teleoperation devices."""
export_io_descriptors: bool = False
"""Whether to export the IO descriptors for the environment. Defaults to False."""
io_descriptors_output_dir: str | None = None
"""The directory to export the IO descriptors to. Defaults to None."""
......@@ -17,6 +17,7 @@ from isaaclab.managers.action_manager import ActionTerm
if TYPE_CHECKING:
from isaaclab.envs import ManagerBasedEnv
from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor
from . import actions_cfg
......@@ -111,6 +112,15 @@ class BinaryJointAction(ActionTerm):
def processed_actions(self) -> torch.Tensor:
return self._processed_actions
@property
def IO_descriptor(self) -> GenericActionIODescriptor:
super().IO_descriptor
self._IO_descriptor.shape = (self.action_dim,)
self._IO_descriptor.dtype = str(self.raw_actions.dtype)
self._IO_descriptor.action_type = "JointAction"
self._IO_descriptor.joint_names = self._joint_names
return self._IO_descriptor
"""
Operations.
"""
......
......@@ -17,6 +17,7 @@ from isaaclab.managers.action_manager import ActionTerm
if TYPE_CHECKING:
from isaaclab.envs import ManagerBasedEnv
from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor
from . import actions_cfg
......@@ -123,6 +124,41 @@ class JointAction(ActionTerm):
def processed_actions(self) -> torch.Tensor:
return self._processed_actions
@property
def IO_descriptor(self) -> GenericActionIODescriptor:
"""The IO descriptor of the action term.
This descriptor is used to describe the action term of the joint action.
It adds the following information to the base descriptor:
- joint_names: The names of the joints.
- scale: The scale of the action term.
- offset: The offset of the action term.
- clip: The clip of the action term.
Returns:
The IO descriptor of the action term.
"""
super().IO_descriptor
self._IO_descriptor.shape = (self.action_dim,)
self._IO_descriptor.dtype = str(self.raw_actions.dtype)
self._IO_descriptor.action_type = "JointAction"
self._IO_descriptor.joint_names = self._joint_names
self._IO_descriptor.scale = self._scale
# This seems to be always [4xNum_joints] IDK why. Need to check.
if isinstance(self._offset, torch.Tensor):
self._IO_descriptor.offset = self._offset[0].detach().cpu().numpy().tolist()
else:
self._IO_descriptor.offset = self._offset
# FIXME: This is not correct. Add list support.
if self.cfg.clip is not None:
if isinstance(self._clip, torch.Tensor):
self._IO_descriptor.clip = self._clip[0].detach().cpu().numpy().tolist()
else:
self._IO_descriptor.clip = self._clip
else:
self._IO_descriptor.clip = None
return self._IO_descriptor
"""
Operations.
"""
......
......@@ -18,6 +18,7 @@ from isaaclab.managers.action_manager import ActionTerm
if TYPE_CHECKING:
from isaaclab.envs import ManagerBasedEnv
from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor
from . import actions_cfg
......@@ -105,6 +106,37 @@ class JointPositionToLimitsAction(ActionTerm):
def processed_actions(self) -> torch.Tensor:
return self._processed_actions
@property
def IO_descriptor(self) -> GenericActionIODescriptor:
"""The IO descriptor of the action term.
This descriptor is used to describe the action term of the joint position to limits action.
It adds the following information to the base descriptor:
- joint_names: The names of the joints.
- scale: The scale of the action term.
- offset: The offset of the action term.
- clip: The clip of the action term.
Returns:
The IO descriptor of the action term.
"""
super().IO_descriptor
self._IO_descriptor.shape = (self.action_dim,)
self._IO_descriptor.dtype = str(self.raw_actions.dtype)
self._IO_descriptor.action_type = "JointAction"
self._IO_descriptor.joint_names = self._joint_names
self._IO_descriptor.scale = self._scale
# This seems to be always [4xNum_joints] IDK why. Need to check.
if isinstance(self._offset, torch.Tensor):
self._IO_descriptor.offset = self._offset[0].detach().cpu().numpy().tolist()
else:
self._IO_descriptor.offset = self._offset
if self.cfg.clip is not None:
self._IO_descriptor.clip = self._clip
else:
self._IO_descriptor.clip = None
return self._IO_descriptor
"""
Operations.
"""
......@@ -195,6 +227,33 @@ class EMAJointPositionToLimitsAction(JointPositionToLimitsAction):
# initialize the previous targets
self._prev_applied_actions = torch.zeros_like(self.processed_actions)
@property
def IO_descriptor(self) -> GenericActionIODescriptor:
"""The IO descriptor of the action term.
This descriptor is used to describe the action term of the EMA joint position to limits action.
It adds the following information to the base descriptor:
- joint_names: The names of the joints.
- scale: The scale of the action term.
- offset: The offset of the action term.
- clip: The clip of the action term.
- alpha: The moving average weight.
Returns:
The IO descriptor of the action term.
"""
super().IO_descriptor
if isinstance(self._alpha, float):
self._IO_descriptor.alpha = self._alpha
elif isinstance(self._alpha, torch.Tensor):
self._IO_descriptor.alpha = self._alpha[0].detach().cpu().numpy().tolist()
else:
raise ValueError(
f"Unsupported moving average weight type: {type(self._alpha)}. Supported types are float and"
" torch.Tensor."
)
return self._IO_descriptor
def reset(self, env_ids: Sequence[int] | None = None) -> None:
# check if specific environment ids are provided
if env_ids is None:
......
......@@ -18,6 +18,7 @@ from isaaclab.utils.math import euler_xyz_from_quat
if TYPE_CHECKING:
from isaaclab.envs import ManagerBasedEnv
from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor
from . import actions_cfg
......@@ -134,6 +135,36 @@ class NonHolonomicAction(ActionTerm):
def processed_actions(self) -> torch.Tensor:
return self._processed_actions
@property
def IO_descriptor(self) -> GenericActionIODescriptor:
"""The IO descriptor of the action term.
This descriptor is used to describe the action term of the non-holonomic action.
It adds the following information to the base descriptor:
- scale: The scale of the action term.
- offset: The offset of the action term.
- clip: The clip of the action term.
- body_name: The name of the body.
- x_joint_name: The name of the x joint.
- y_joint_name: The name of the y joint.
- yaw_joint_name: The name of the yaw joint.
Returns:
The IO descriptor of the action term.
"""
super().IO_descriptor
self._IO_descriptor.shape = (self.action_dim,)
self._IO_descriptor.dtype = str(self.raw_actions.dtype)
self._IO_descriptor.action_type = "non holonomic actions"
self._IO_descriptor.scale = self._scale
self._IO_descriptor.offset = self._offset
self._IO_descriptor.clip = self._clip
self._IO_descriptor.body_name = self._body_name
self._IO_descriptor.x_joint_name = self._joint_names[0]
self._IO_descriptor.y_joint_name = self._joint_names[1]
self._IO_descriptor.yaw_joint_name = self._joint_names[2]
return self._IO_descriptor
"""
Operations.
"""
......
......@@ -17,6 +17,7 @@ from isaaclab.managers.action_manager import ActionTerm
if TYPE_CHECKING:
from isaaclab.envs import ManagerBasedEnv
from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor
from . import pink_actions_cfg
......@@ -125,6 +126,31 @@ class PinkInverseKinematicsAction(ActionTerm):
"""Get the processed actions tensor."""
return self._processed_actions
@property
def IO_descriptor(self) -> GenericActionIODescriptor:
"""The IO descriptor of the action term.
This descriptor is used to describe the action term of the pink inverse kinematics action.
It adds the following information to the base descriptor:
- scale: The scale of the action term.
- offset: The offset of the action term.
- clip: The clip of the action term.
- pink_controller_joint_names: The names of the pink controller joints.
- hand_joint_names: The names of the hand joints.
- controller_cfg: The configuration of the pink controller.
Returns:
The IO descriptor of the action term.
"""
super().IO_descriptor
self._IO_descriptor.shape = (self.action_dim,)
self._IO_descriptor.dtype = str(self.raw_actions.dtype)
self._IO_descriptor.action_type = "PinkInverseKinematicsAction"
self._IO_descriptor.pink_controller_joint_names = self._pink_controlled_joint_names
self._IO_descriptor.hand_joint_names = self._hand_joint_names
self._IO_descriptor.extras["controller_cfg"] = self.cfg.controller.__dict__
return self._IO_descriptor
# """
# Operations.
# """
......
......@@ -23,6 +23,7 @@ from isaaclab.sim.utils import find_matching_prims
if TYPE_CHECKING:
from isaaclab.envs import ManagerBasedEnv
from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor
from . import actions_cfg
......@@ -148,6 +149,37 @@ class DifferentialInverseKinematicsAction(ActionTerm):
jacobian[:, 3:, :] = torch.bmm(base_rot_matrix, jacobian[:, 3:, :])
return jacobian
@property
def IO_descriptor(self) -> GenericActionIODescriptor:
"""The IO descriptor of the action term.
This descriptor is used to describe the action term of the pink inverse kinematics action.
It adds the following information to the base descriptor:
- body_name: The name of the body.
- joint_names: The names of the joints.
- scale: The scale of the action term.
- clip: The clip of the action term.
- controller_cfg: The configuration of the controller.
- body_offset: The offset of the body.
Returns:
The IO descriptor of the action term.
"""
super().IO_descriptor
self._IO_descriptor.shape = (self.action_dim,)
self._IO_descriptor.dtype = str(self.raw_actions.dtype)
self._IO_descriptor.action_type = "TaskSpaceAction"
self._IO_descriptor.body_name = self._body_name
self._IO_descriptor.joint_names = self._joint_names
self._IO_descriptor.scale = self._scale
if self.cfg.clip is not None:
self._IO_descriptor.clip = self.cfg.clip
else:
self._IO_descriptor.clip = None
self._IO_descriptor.extras["controller_cfg"] = self.cfg.controller.__dict__
self._IO_descriptor.extras["body_offset"] = self.cfg.body_offset.__dict__
return self._IO_descriptor
"""
Operations.
"""
......@@ -409,6 +441,47 @@ class OperationalSpaceControllerAction(ActionTerm):
jacobian[:, 3:, :] = torch.bmm(base_rot_matrix, jacobian[:, 3:, :])
return jacobian
@property
def IO_descriptor(self) -> GenericActionIODescriptor:
"""The IO descriptor of the action term.
This descriptor is used to describe the action term of the pink inverse kinematics action.
It adds the following information to the base descriptor:
- body_name: The name of the body.
- joint_names: The names of the joints.
- position_scale: The scale of the position.
- orientation_scale: The scale of the orientation.
- wrench_scale: The scale of the wrench.
- stiffness_scale: The scale of the stiffness.
- damping_ratio_scale: The scale of the damping ratio.
- nullspace_joint_pos_target: The nullspace joint pos target.
- clip: The clip of the action term.
- controller_cfg: The configuration of the controller.
- body_offset: The offset of the body.
Returns:
The IO descriptor of the action term.
"""
super().IO_descriptor
self._IO_descriptor.shape = (self.action_dim,)
self._IO_descriptor.dtype = str(self.raw_actions.dtype)
self._IO_descriptor.action_type = "TaskSpaceAction"
self._IO_descriptor.body_name = self._ee_body_name
self._IO_descriptor.joint_names = self._joint_names
self._IO_descriptor.position_scale = self.cfg.position_scale
self._IO_descriptor.orientation_scale = self.cfg.orientation_scale
self._IO_descriptor.wrench_scale = self.cfg.wrench_scale
self._IO_descriptor.stiffness_scale = self.cfg.stiffness_scale
self._IO_descriptor.damping_ratio_scale = self.cfg.damping_ratio_scale
self._IO_descriptor.nullspace_joint_pos_target = self.cfg.nullspace_joint_pos_target
if self.cfg.clip is not None:
self._IO_descriptor.clip = self.cfg.clip
else:
self._IO_descriptor.clip = None
self._IO_descriptor.extras["controller_cfg"] = self.cfg.controller_cfg.__dict__
self._IO_descriptor.extras["body_offset"] = self.cfg.body_offset.__dict__
return self._IO_descriptor
"""
Operations.
"""
......
......@@ -24,12 +24,22 @@ from isaaclab.sensors import Camera, Imu, RayCaster, RayCasterCamera, TiledCamer
if TYPE_CHECKING:
from isaaclab.envs import ManagerBasedEnv, ManagerBasedRLEnv
from isaaclab.envs.utils.io_descriptors import (
generic_io_descriptor,
record_body_names,
record_dtype,
record_joint_names,
record_joint_pos_offsets,
record_joint_vel_offsets,
record_shape,
)
"""
Root state.
"""
@generic_io_descriptor(units="m", axes=["Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype])
def base_pos_z(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""Root height in the simulation world frame."""
# extract the used quantities (to enable type-hinting)
......@@ -37,6 +47,9 @@ def base_pos_z(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg(
return asset.data.root_pos_w[:, 2].unsqueeze(-1)
@generic_io_descriptor(
units="m/s", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]
)
def base_lin_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""Root linear velocity in the asset's root frame."""
# extract the used quantities (to enable type-hinting)
......@@ -44,6 +57,9 @@ def base_lin_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCf
return asset.data.root_lin_vel_b
@generic_io_descriptor(
units="rad/s", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]
)
def base_ang_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""Root angular velocity in the asset's root frame."""
# extract the used quantities (to enable type-hinting)
......@@ -51,6 +67,9 @@ def base_ang_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCf
return asset.data.root_ang_vel_b
@generic_io_descriptor(
units="m/s^2", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]
)
def projected_gravity(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""Gravity projection on the asset's root frame."""
# extract the used quantities (to enable type-hinting)
......@@ -58,6 +77,9 @@ def projected_gravity(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEnt
return asset.data.projected_gravity_b
@generic_io_descriptor(
units="m", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]
)
def root_pos_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""Asset root position in the environment frame."""
# extract the used quantities (to enable type-hinting)
......@@ -65,6 +87,9 @@ def root_pos_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg(
return asset.data.root_pos_w - env.scene.env_origins
@generic_io_descriptor(
units="unit", axes=["W", "X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]
)
def root_quat_w(
env: ManagerBasedEnv, make_quat_unique: bool = False, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")
) -> torch.Tensor:
......@@ -82,6 +107,9 @@ def root_quat_w(
return math_utils.quat_unique(quat) if make_quat_unique else quat
@generic_io_descriptor(
units="m/s", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]
)
def root_lin_vel_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""Asset root linear velocity in the environment frame."""
# extract the used quantities (to enable type-hinting)
......@@ -89,6 +117,9 @@ def root_lin_vel_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntity
return asset.data.root_lin_vel_w
@generic_io_descriptor(
units="rad/s", axes=["X", "Y", "Z"], observation_type="RootState", on_inspect=[record_shape, record_dtype]
)
def root_ang_vel_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""Asset root angular velocity in the environment frame."""
# extract the used quantities (to enable type-hinting)
......@@ -101,6 +132,7 @@ Body state
"""
@generic_io_descriptor(observation_type="BodyState", on_inspect=[record_shape, record_dtype, record_body_names])
def body_pose_w(
env: ManagerBasedEnv,
asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"),
......@@ -126,6 +158,7 @@ def body_pose_w(
return pose.reshape(env.num_envs, -1)
@generic_io_descriptor(observation_type="BodyState", on_inspect=[record_shape, record_dtype, record_body_names])
def body_projected_gravity_b(
env: ManagerBasedEnv,
asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"),
......@@ -155,6 +188,9 @@ Joint state.
"""
@generic_io_descriptor(
observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape], units="rad"
)
def joint_pos(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""The joint positions of the asset.
......@@ -165,6 +201,11 @@ def joint_pos(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("
return asset.data.joint_pos[:, asset_cfg.joint_ids]
@generic_io_descriptor(
observation_type="JointState",
on_inspect=[record_joint_names, record_dtype, record_shape, record_joint_pos_offsets],
units="rad",
)
def joint_pos_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""The joint positions of the asset w.r.t. the default joint positions.
......@@ -175,6 +216,7 @@ def joint_pos_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityC
return asset.data.joint_pos[:, asset_cfg.joint_ids] - asset.data.default_joint_pos[:, asset_cfg.joint_ids]
@generic_io_descriptor(observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape])
def joint_pos_limit_normalized(
env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")
) -> torch.Tensor:
......@@ -191,6 +233,9 @@ def joint_pos_limit_normalized(
)
@generic_io_descriptor(
observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape], units="rad/s"
)
def joint_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")):
"""The joint velocities of the asset.
......@@ -201,6 +246,11 @@ def joint_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("
return asset.data.joint_vel[:, asset_cfg.joint_ids]
@generic_io_descriptor(
observation_type="JointState",
on_inspect=[record_joint_names, record_dtype, record_shape, record_joint_vel_offsets],
units="rad/s",
)
def joint_vel_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")):
"""The joint velocities of the asset w.r.t. the default joint velocities.
......@@ -211,6 +261,9 @@ def joint_vel_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityC
return asset.data.joint_vel[:, asset_cfg.joint_ids] - asset.data.default_joint_vel[:, asset_cfg.joint_ids]
@generic_io_descriptor(
observation_type="JointState", on_inspect=[record_joint_names, record_dtype, record_shape], units="N.m"
)
def joint_effort(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""The joint applied effort of the robot.
......@@ -597,6 +650,7 @@ Actions.
"""
@generic_io_descriptor(dtype=torch.float32, observation_type="Action", on_inspect=[record_shape])
def last_action(env: ManagerBasedEnv, action_name: str | None = None) -> torch.Tensor:
"""The last input action to the environment.
......@@ -614,7 +668,8 @@ Commands.
"""
def generated_commands(env: ManagerBasedRLEnv, command_name: str) -> torch.Tensor:
@generic_io_descriptor(dtype=torch.float32, observation_type="Command", on_inspect=[record_shape])
def generated_commands(env: ManagerBasedRLEnv, command_name: str | None = None) -> torch.Tensor:
"""The generated command from command term in the command manager with the given name."""
return env.command_manager.get_command(command_name)
......
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
from collections.abc import Callable
from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar
from isaaclab.utils import configclass
if TYPE_CHECKING:
from isaaclab.envs import ManagerBasedEnv
from isaaclab.assets.articulation import Articulation
import torch
import dataclasses
import functools
import inspect
@configclass
class GenericActionIODescriptor:
"""Generic action IO descriptor.
This descriptor is used to describe the action space of a policy.
It can be extended as needed to add more information about the action term that is being described.
"""
mdp_type: str = "Action"
"""The type of MDP that the action term belongs to."""
name: str = None
"""The name of the action term.
By default, the name of the action term class is used.
"""
full_path: str = None
"""The full path of the action term class.
By default, python's will retrieve the path from the file that the action term class is defined in
and the name of the action term class.
"""
description: str = None
"""The description of the action term.
By default, the docstring of the action term class is used.
"""
shape: tuple[int, ...] = None
"""The shape of the action term.
This should be populated by the user."""
dtype: str = None
"""The dtype of the action term.
This should be populated by the user."""
action_type: str = None
"""The type of the action term.
This attribute is purely informative and should be populated by the user."""
extras: dict[str, Any] = {}
"""Extra information about the action term.
This attribute is purely informative and should be populated by the user."""
export: bool = True
"""Whether to export the action term.
Should be set to False if the class is not meant to be exported.
"""
@configclass
class GenericObservationIODescriptor:
"""Generic observation IO descriptor.
This descriptor is used to describe the observation space of a policy.
It can be extended as needed to add more information about the observation term that is being described.
"""
mdp_type: str = "Observation"
name: str = None
full_path: str = None
description: str = None
shape: tuple[int, ...] = None
dtype: str = None
observation_type: str = None
extras: dict[str, Any] = {}
# These are defined to help with type hinting
P = ParamSpec("P")
R = TypeVar("R")
# Automatically builds a descriptor from the kwargs
def _make_descriptor(**kwargs: Any) -> GenericObservationIODescriptor:
"""Split *kwargs* into (known dataclass fields) and (extras)."""
field_names = {f.name for f in dataclasses.fields(GenericObservationIODescriptor)}
known = {k: v for k, v in kwargs.items() if k in field_names}
extras = {k: v for k, v in kwargs.items() if k not in field_names}
desc = GenericObservationIODescriptor(**known)
# User defined extras are stored in the descriptor under the `extras` field
desc.extras = extras
return desc
# Decorator factory for generic IO descriptors.
def generic_io_descriptor(
_func: Callable[Concatenate[ManagerBasedEnv, P], R] | None = None,
*,
on_inspect: Callable[..., Any] | list[Callable[..., Any]] | None = None,
**descriptor_kwargs: Any,
) -> Callable[[Callable[Concatenate[ManagerBasedEnv, P], R]], Callable[Concatenate[ManagerBasedEnv, P], R]]:
"""
Decorator factory for generic IO descriptors.
This decorator can be used in different ways:
1. The default decorator has all the information I need for my use case:
..code-block:: python
@generic_io_descriptor(GenericIODescriptor(description="..", dtype=".."))
def my_func(env: ManagerBasedEnv, *args, **kwargs):
...
..note:: If description is not set, the function's docstring is used to populate it.
2. I need to add more information to the descriptor:
..code-block:: python
@generic_io_descriptor(description="..", new_var_1="a", new_var_2="b")
def my_func(env: ManagerBasedEnv, *args, **kwargs):
...
3. I need to add a hook to the descriptor:
..code-block:: python
def record_shape(tensor: torch.Tensor, desc: GenericIODescriptor, **kwargs):
desc.shape = (tensor.shape[-1],)
@generic_io_descriptor(description="..", new_var_1="a", new_var_2="b", on_inspect=[record_shape, record_dtype])
def my_func(env: ManagerBasedEnv, *args, **kwargs):
..note:: The hook is called after the function is called, if and only if the `inspect` flag is set when calling the function.
For example:
..code-block:: python
my_func(env, inspect=True)
4. I need to add a hook to the descriptor and this hook will write to a variable that is not part of the base descriptor.
..code-block:: python
def record_joint_names(output: torch.Tensor, descriptor: GenericIODescriptor, **kwargs):
asset: Articulation = kwargs["env"].scene[kwargs["asset_cfg"].name]
joint_ids = kwargs["asset_cfg"].joint_ids
if joint_ids == slice(None, None, None):
joint_ids = list(range(len(asset.joint_names)))
descriptor.joint_names = [asset.joint_names[i] for i in joint_ids]
@generic_io_descriptor(new_var_1="a", new_var_2="b", on_inspect=[record_shape, record_dtype, record_joint_names])
def my_func(env: ManagerBasedEnv, *args, **kwargs):
..note:: The hook can access all the variables in the wrapped function's signature. While it is useful, the user should be careful to
access only existing variables.
Args:
_func: The function to decorate.
**descriptor_kwargs: Keyword arguments to pass to the descriptor.
Returns:
A decorator that can be used to decorate a function.
"""
# If the decorator is used with a descriptor, use it as the descriptor.
if _func is not None and isinstance(_func, GenericObservationIODescriptor):
descriptor = _func
_func = None
else:
descriptor = _make_descriptor(**descriptor_kwargs)
# Ensures the hook is a list
if callable(on_inspect):
inspect_hooks: list[Callable[..., Any]] = [on_inspect]
else:
inspect_hooks: list[Callable[..., Any]] = list(on_inspect or []) # handles None
def _apply(func: Callable[Concatenate[ManagerBasedEnv, P], R]) -> Callable[Concatenate[ManagerBasedEnv, P], R]:
# Capture the signature of the function
sig = inspect.signature(func)
@functools.wraps(func)
def wrapper(env: ManagerBasedEnv, *args: P.args, **kwargs: P.kwargs) -> R:
inspect_flag: bool = kwargs.pop("inspect", False)
out = func(env, *args, **kwargs)
if inspect_flag:
# Injects the function's arguments into the hooks and applies the defaults
bound = sig.bind(env, *args, **kwargs)
bound.apply_defaults()
call_kwargs = {
"output": out,
"descriptor": descriptor,
**bound.arguments,
}
for hook in inspect_hooks:
hook(**call_kwargs)
return out
# --- Descriptor bookkeeping ---
descriptor.name = func.__name__
descriptor.full_path = f"{func.__module__}.{func.__name__}"
descriptor.dtype = str(descriptor.dtype)
# Check if description is set in the descriptor
if descriptor.description is None:
descriptor.description = " ".join(func.__doc__.split())
# Adds the descriptor to the wrapped function as an attribute
wrapper._descriptor = descriptor
wrapper._has_descriptor = True
# Alters the signature of the wrapped function to make it match the original function.
# This allows the wrapped functions to pass the checks in the managers.
wrapper.__signature__ = sig
return wrapper
# If the decorator is used without parentheses, _func will be the function itself.
if callable(_func):
return _apply(_func)
return _apply
def record_shape(output: torch.Tensor, descriptor: GenericObservationIODescriptor, **kwargs) -> None:
"""Record the shape of the output tensor.
Args:
output: The output tensor.
descriptor: The descriptor to record the shape to.
**kwargs: Additional keyword arguments.
"""
descriptor.shape = (output.shape[-1],)
def record_dtype(output: torch.Tensor, descriptor: GenericObservationIODescriptor, **kwargs) -> None:
"""Record the dtype of the output tensor.
Args:
output: The output tensor.
descriptor: The descriptor to record the dtype to.
**kwargs: Additional keyword arguments.
"""
descriptor.dtype = str(output.dtype)
def record_joint_names(output: torch.Tensor, descriptor: GenericObservationIODescriptor, **kwargs) -> None:
"""Record the joint names of the output tensor.
Expects the `asset_cfg` keyword argument to be set.
Args:
output: The output tensor.
descriptor: The descriptor to record the joint names to.
**kwargs: Additional keyword arguments.
"""
asset: Articulation = kwargs["env"].scene[kwargs["asset_cfg"].name]
joint_ids = kwargs["asset_cfg"].joint_ids
if joint_ids == slice(None, None, None):
joint_ids = list(range(len(asset.joint_names)))
descriptor.joint_names = [asset.joint_names[i] for i in joint_ids]
def record_body_names(output: torch.Tensor, descriptor: GenericObservationIODescriptor, **kwargs) -> None:
"""Record the body names of the output tensor.
Expects the `asset_cfg` keyword argument to be set.
Args:
output: The output tensor.
descriptor: The descriptor to record the body names to.
**kwargs: Additional keyword arguments.
"""
asset: Articulation = kwargs["env"].scene[kwargs["asset_cfg"].name]
body_ids = kwargs["asset_cfg"].body_ids
if body_ids == slice(None, None, None):
body_ids = list(range(len(asset.body_names)))
descriptor.body_names = [asset.body_names[i] for i in body_ids]
def record_joint_pos_offsets(output: torch.Tensor, descriptor: GenericObservationIODescriptor, **kwargs):
"""Record the joint position offsets of the output tensor.
Expects the `asset_cfg` keyword argument to be set.
Args:
output: The output tensor.
descriptor: The descriptor to record the joint position offsets to.
**kwargs: Additional keyword arguments.
"""
asset: Articulation = kwargs["env"].scene[kwargs["asset_cfg"].name]
ids = kwargs["asset_cfg"].joint_ids
# Get the offsets of the joints for the first robot in the scene.
# This assumes that all robots have the same joint offsets.
descriptor.joint_pos_offsets = asset.data.default_joint_pos[:, ids][0]
def record_joint_vel_offsets(output: torch.Tensor, descriptor: GenericObservationIODescriptor, **kwargs):
"""Record the joint velocity offsets of the output tensor.
Expects the `asset_cfg` keyword argument to be set.
Args:
output: The output tensor.
descriptor: The descriptor to record the joint velocity offsets to.
**kwargs: Additional keyword arguments.
"""
asset: Articulation = kwargs["env"].scene[kwargs["asset_cfg"].name]
ids = kwargs["asset_cfg"].joint_ids
# Get the offsets of the joints for the first robot in the scene.
# This assumes that all robots have the same joint offsets.
descriptor.joint_vel_offsets = asset.data.default_joint_vel[:, ids][0]
def export_articulations_data(env: ManagerBasedEnv) -> dict[str, dict[str, list[float]]]:
"""Export the articulations data.
Args:
env: The environment.
Returns:
A dictionary containing the articulations data.
"""
# Create a dictionary for all the articulations in the scene.
articulation_joint_data = {}
for articulation_name, articulation in env.scene.articulations.items():
# For each articulation, create a dictionary with the articulation's data.
# Some of the data may be redundant with other information provided by the observation descriptors.
articulation_joint_data[articulation_name] = {}
articulation_joint_data[articulation_name]["joint_names"] = articulation.joint_names
articulation_joint_data[articulation_name]["default_joint_pos"] = (
articulation.data.default_joint_pos[0].detach().cpu().numpy().tolist()
)
articulation_joint_data[articulation_name]["default_joint_vel"] = (
articulation.data.default_joint_vel[0].detach().cpu().numpy().tolist()
)
articulation_joint_data[articulation_name]["default_joint_pos_limits"] = (
articulation.data.default_joint_pos_limits[0].detach().cpu().numpy().tolist()
)
articulation_joint_data[articulation_name]["default_joint_damping"] = (
articulation.data.default_joint_damping[0].detach().cpu().numpy().tolist()
)
articulation_joint_data[articulation_name]["default_joint_stiffness"] = (
articulation.data.default_joint_stiffness[0].detach().cpu().numpy().tolist()
)
articulation_joint_data[articulation_name]["default_joint_friction"] = (
articulation.data.default_joint_friction[0].detach().cpu().numpy().tolist()
)
articulation_joint_data[articulation_name]["default_joint_armature"] = (
articulation.data.default_joint_armature[0].detach().cpu().numpy().tolist()
)
return articulation_joint_data
def export_scene_data(env: ManagerBasedEnv) -> dict[str, Any]:
"""Export the scene data.
Args:
env: The environment.
Returns:
A dictionary containing the scene data.
"""
# Create a dictionary for the scene data.
scene_data = {"physics_dt": env.physics_dt, "dt": env.step_dt, "decimation": env.cfg.decimation}
return scene_data
......@@ -8,16 +8,18 @@
from __future__ import annotations
import inspect
import re
import torch
import weakref
from abc import abstractmethod
from collections.abc import Sequence
from prettytable import PrettyTable
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
import omni.kit.app
from isaaclab.assets import AssetBase
from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor
from .manager_base import ManagerBase, ManagerTermBase
from .manager_term_cfg import ActionTermCfg
......@@ -50,6 +52,8 @@ class ActionTerm(ManagerTermBase):
super().__init__(cfg, env)
# parse config to obtain asset to which the term is applied
self._asset: AssetBase = self._env.scene[self.cfg.asset_name]
self._IO_descriptor = GenericActionIODescriptor()
self._export_IO_descriptor = True
# add handle for debug visualization (this is set to a valid handle inside set_debug_vis)
self._debug_vis_handle = None
......@@ -91,6 +95,20 @@ class ActionTerm(ManagerTermBase):
source_code = inspect.getsource(self._set_debug_vis_impl)
return "NotImplementedError" not in source_code
@property
def IO_descriptor(self) -> GenericActionIODescriptor:
"""The IO descriptor for the action term."""
self._IO_descriptor.name = re.sub(r"([a-z])([A-Z])", r"\1_\2", self.__class__.__name__).lower()
self._IO_descriptor.full_path = f"{self.__class__.__module__}.{self.__class__.__name__}"
self._IO_descriptor.description = " ".join(self.__class__.__doc__.split())
self._IO_descriptor.export = self.export_IO_descriptor
return self._IO_descriptor
@property
def export_IO_descriptor(self) -> bool:
"""Whether to export the IO descriptor for the action term."""
return self._export_IO_descriptor
"""
Operations.
"""
......@@ -259,6 +277,41 @@ class ActionManager(ManagerBase):
has_debug_vis |= term.has_debug_vis_implementation
return has_debug_vis
@property
def get_IO_descriptors(self) -> list[dict[str, Any]]:
"""Get the IO descriptors for the action manager.
Returns:
A dictionary with keys as the term names and values as the IO descriptors.
"""
data = []
for term_name, term in self._terms.items():
try:
data.append(term.IO_descriptor.__dict__.copy())
except Exception as e:
print(f"Error getting IO descriptor for term '{term_name}': {e}")
formatted_data = []
for item in data:
name = item.pop("name")
formatted_item = {"name": name, "extras": item.pop("extras")}
print(item["export"])
if not item.pop("export"):
continue
for k, v in item.items():
# Check if v is a tuple and convert to list
if isinstance(v, tuple):
v = list(v)
if k in ["description", "units"]:
formatted_item["extras"][k] = v
else:
formatted_item[k] = v
formatted_data.append(formatted_item)
return formatted_data
"""
Operations.
"""
......
......@@ -225,6 +225,70 @@ class ObservationManager(ManagerBase):
"""
return self._group_obs_concatenate
@property
def get_IO_descriptors(self, group_names_to_export: list[str] = ["policy"]):
"""Get the IO descriptors for the observation manager.
Returns:
A dictionary with keys as the group names and values as the IO descriptors.
"""
group_data = {}
for group_name in self._group_obs_term_names:
group_data[group_name] = []
# check if group name is valid
if group_name not in self._group_obs_term_names:
raise ValueError(
f"Unable to find the group '{group_name}' in the observation manager."
f" Available groups are: {list(self._group_obs_term_names.keys())}"
)
# iterate over all the terms in each group
group_term_names = self._group_obs_term_names[group_name]
# read attributes for each term
obs_terms = zip(group_term_names, self._group_obs_term_cfgs[group_name])
for term_name, term_cfg in obs_terms:
# Call to the observation function to get the IO descriptor with the inspect flag set to True
try:
term_cfg.func(self._env, **term_cfg.params, inspect=True)
# Copy the descriptor and update with the term's own extra parameters
desc = term_cfg.func._descriptor.__dict__.copy()
# Create a dictionary to store the overloads
overloads = {}
# Iterate over the term's own parameters and add them to the overloads dictionary
for k, v in term_cfg.__dict__.items():
# For now we do not add the noise modifier
if k in ["modifiers", "clip", "scale", "history_length", "flatten_history_dim"]:
overloads[k] = v
desc.update(overloads)
group_data[group_name].append(desc)
except Exception as e:
print(f"Error getting IO descriptor for term '{term_name}' in group '{group_name}': {e}")
# Format the data for YAML export
formatted_data = {}
for group_name, data in group_data.items():
formatted_data[group_name] = []
for item in data:
name = item.pop("name")
formatted_item = {"name": name, "overloads": {}, "extras": item.pop("extras")}
for k, v in item.items():
# Check if v is a tuple and convert to list
if isinstance(v, tuple):
v = list(v)
# Check if v is a tensor and convert to list
if isinstance(v, torch.Tensor):
v = v.detach().cpu().numpy().tolist()
if k in ["scale", "clip", "history_length", "flatten_history_dim"]:
formatted_item["overloads"][k] = v
elif k in ["modifiers", "description", "units"]:
formatted_item["extras"][k] = v
else:
formatted_item[k] = v
formatted_data[group_name].append(formatted_item)
formatted_data = {k: v for k, v in formatted_data.items() if k in group_names_to_export}
return formatted_data
"""
Operations.
"""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment