Unverified Commit bc7e5d7b authored by rebeccazhang0707's avatar rebeccazhang0707 Committed by GitHub

Adds galbot stack cube tasks, with left_arm_gripper and right_arm_suction,...

Adds galbot stack cube tasks, with left_arm_gripper and right_arm_suction, using RMPFlow controller (#3210)

# Description

<!--
Thank you for your interest in sending a pull request. Please make sure
to check the contribution guidelines.

Link:
https://isaac-sim.github.io/IsaacLab/main/source/refs/contributing.html
-->

Adds galbot_stack_cube tasks and mimic tasks, using RMPFlow controller:
- add galbot robot config and .usd asset
- add RMPFlowAction and RMPFlowActionCfg
- add motion_policy_configs and .urdf for both 'galbot_left_arm_gripper'
and 'galbot_right_arm_suction'
- add new task: galbot stack_rmp_rel_env_cfg
- add new mimic task: galbot_stack_rmp_abs/rel_mimic_env
- add mdp.observations/terminations/events for galbot: support
gripper_state checking for both suction_cup and parallel_gripper, get
obs_in_base_frame
- add gripper_configs (gripper_joint_names, gripper_open_val,
gripper_threshold) in galbot/franka tasks: to make mdp functions
universal to varied robots
- fix a bug (eef_name) in franka_stack_ik_rel_mimic_env.py
- add new device_name in se3_spacemouse.py

Notes: This PR relies on PR
(https://github.com/isaac-sim/IsaacLab/pull/3174) for surface gripper
support in manager-based workflow.

You can test the whole gr00t-mimic workflow by:
1. record demos:
`./isaaclab.sh -p scripts/tools/record_demos.py --task
Isaac-Stack-Cube-Galbot-Left-Arm-Gripper-RmpFlow-v0 --teleop_device
spacemouse --num_demos 1 --device cpu --dataset_file
datasets/recorded_demos_galbot_suction_rel.hdf5`
2. replay demos:
`./isaaclab.sh -p scripts/tools/replay_demos.py --task
Isaac-Stack-Cube-Galbot-Left-Arm-Gripper-RmpFlow-v0 --num_envs 1
--device cpu --dataset_file
datasets/recorded_demos_galbot_suction_rel.hdf5`
3. annotate demos:
`./isaaclab.sh -p
scripts/imitation_learning/isaaclab_mimic/annotate_demos.py --task
Isaac-Stack-Cube-Galbot-Left-Arm-Gripper-RmpFlow-Rel-Mimic-v0 --auto
--device cpu --input_file
datasets/recorded_demos_galbot_suction_rel.hdf5 --output_file
datasets/annotated_demos_galbot_suction_rel.hdf5`
4. generate dataset:
`./isaaclab.sh -p
scripts/imitation_learning/isaaclab_mimic/generate_dataset.py --task
Isaac-Stack-Cube-Galbot-Left-Arm-Gripper-RmpFlow-Rel-Mimic-v0 --num_envs
16 --device cpu --generation_num_trials 10 --input_file
datasets/annotated_demos_galbot_suction_rel.hdf5 --output_file
datasets/generated_demos_galbot_suction_rel.hdf5`

<!-- As a practice, it is recommended to open an issue to have
discussions on the proposed pull request.
This makes it easier for the community to keep track of what is being
developed or added, and if a given feature
is demanded by more than one party. -->

## Type of change

<!-- As you go through the list, delete the ones that are not
applicable. -->

- Bug fix (non-breaking change which fixes an issue)
- New feature (non-breaking change which adds functionality)

## Screenshot
<img width="1428" height="281" alt="environments_galbot"
src="https://github.com/user-attachments/assets/b603bc60-7ff3-44e8-bf41-ab8126a753ec"
/>


## Checklist

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./isaaclab.sh --format`
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [ ] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there

<!--
As you go through the checklist above, you can mark something as done by
putting an x character in it

For example,
- [x] I have done this task
- [ ] I have not done this task
-->

---------
Signed-off-by: 's avatarrebeccazhang0707 <168459200+rebeccazhang0707@users.noreply.github.com>
Signed-off-by: 's avatarKelly Guo <kellyg@nvidia.com>
Co-authored-by: 's avatarKelly Guo <kellyg@nvidia.com>
parent e4b5681e
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
Available Environments Available Environments
====================== ======================
The following lists comprises of all the RL or IL tasks implementations that are available in Isaac Lab. The following lists comprises of all the RL and IL tasks implementations that are available in Isaac Lab.
While we try to keep this list up-to-date, you can always get the latest list of environments by While we try to keep this list up-to-date, you can always get the latest list of environments by
running the following command: running the following command:
...@@ -142,6 +142,9 @@ for the lift-cube environment: ...@@ -142,6 +142,9 @@ for the lift-cube environment:
| |gr1_pp_waist| | |gr1_pp_waist-link| | Pick up and place an object in a basket with a GR-1 humanoid robot | | |gr1_pp_waist| | |gr1_pp_waist-link| | Pick up and place an object in a basket with a GR-1 humanoid robot |
| | | with waist degrees-of-freedom enables that provides a wider reach space. | | | | with waist degrees-of-freedom enables that provides a wider reach space. |
+----------------------+---------------------------+-----------------------------------------------------------------------------+ +----------------------+---------------------------+-----------------------------------------------------------------------------+
| |galbot_stack| | |galbot_stack-link| | Stack three cubes (bottom to top: blue, red, green) with the left arm of |
| | | a Galbot humanoid robot |
+----------------------+---------------------------+-----------------------------------------------------------------------------+
.. |reach-franka| image:: ../_static/tasks/manipulation/franka_reach.jpg .. |reach-franka| image:: ../_static/tasks/manipulation/franka_reach.jpg
.. |reach-ur10| image:: ../_static/tasks/manipulation/ur10_reach.jpg .. |reach-ur10| image:: ../_static/tasks/manipulation/ur10_reach.jpg
...@@ -154,6 +157,8 @@ for the lift-cube environment: ...@@ -154,6 +157,8 @@ for the lift-cube environment:
.. |gr1_pick_place| image:: ../_static/tasks/manipulation/gr-1_pick_place.jpg .. |gr1_pick_place| image:: ../_static/tasks/manipulation/gr-1_pick_place.jpg
.. |surface-gripper| image:: ../_static/tasks/manipulation/ur10_stack_surface_gripper.jpg .. |surface-gripper| image:: ../_static/tasks/manipulation/ur10_stack_surface_gripper.jpg
.. |gr1_pp_waist| image:: ../_static/tasks/manipulation/gr-1_pick_place_waist.jpg .. |gr1_pp_waist| image:: ../_static/tasks/manipulation/gr-1_pick_place_waist.jpg
.. |surface-gripper| image:: ../_static/tasks/manipulation/ur10_stack_surface_gripper.jpg
.. |galbot_stack| image:: ../_static/tasks/manipulation/galbot_stack_cube.jpg
.. |reach-franka-link| replace:: `Isaac-Reach-Franka-v0 <https://github.com/isaac-sim/IsaacLab/blob/main/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/reach/config/franka/joint_pos_env_cfg.py>`__ .. |reach-franka-link| replace:: `Isaac-Reach-Franka-v0 <https://github.com/isaac-sim/IsaacLab/blob/main/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/reach/config/franka/joint_pos_env_cfg.py>`__
.. |reach-ur10-link| replace:: `Isaac-Reach-UR10-v0 <https://github.com/isaac-sim/IsaacLab/blob/main/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/reach/config/ur_10/joint_pos_env_cfg.py>`__ .. |reach-ur10-link| replace:: `Isaac-Reach-UR10-v0 <https://github.com/isaac-sim/IsaacLab/blob/main/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/reach/config/ur_10/joint_pos_env_cfg.py>`__
...@@ -171,6 +176,9 @@ for the lift-cube environment: ...@@ -171,6 +176,9 @@ for the lift-cube environment:
.. |long-suction-link| replace:: `Isaac-Stack-Cube-UR10-Long-Suction-IK-Rel-v0 <https://github.com/isaac-sim/IsaacLab/blob/main/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/stack/config/ur10_gripper/stack_ik_rel_env_cfg.py>`__ .. |long-suction-link| replace:: `Isaac-Stack-Cube-UR10-Long-Suction-IK-Rel-v0 <https://github.com/isaac-sim/IsaacLab/blob/main/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/stack/config/ur10_gripper/stack_ik_rel_env_cfg.py>`__
.. |short-suction-link| replace:: `Isaac-Stack-Cube-UR10-Short-Suction-IK-Rel-v0 <https://github.com/isaac-sim/IsaacLab/blob/main/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/stack/config/ur10_gripper/stack_ik_rel_env_cfg.py>`__ .. |short-suction-link| replace:: `Isaac-Stack-Cube-UR10-Short-Suction-IK-Rel-v0 <https://github.com/isaac-sim/IsaacLab/blob/main/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/stack/config/ur10_gripper/stack_ik_rel_env_cfg.py>`__
.. |gr1_pp_waist-link| replace:: `Isaac-PickPlace-GR1T2-WaistEnabled-Abs-v0 <https://github.com/isaac-sim/IsaacLab/blob/main/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/pick_place/pickplace_gr1t2_waist_enabled_env_cfg.py>`__ .. |gr1_pp_waist-link| replace:: `Isaac-PickPlace-GR1T2-WaistEnabled-Abs-v0 <https://github.com/isaac-sim/IsaacLab/blob/main/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/pick_place/pickplace_gr1t2_waist_enabled_env_cfg.py>`__
.. |galbot_stack-link| replace:: `Isaac-Stack-Cube-Galbot-Left-Arm-Gripper-RmpFlow-v0 <https://github.com/isaac-sim/IsaacLab/blob/main/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/stack/config/galbot/stack_rmp_rel_env_cfg.py>`__
.. |long-suction-link| replace:: `Isaac-Stack-Cube-UR10-Long-Suction-IK-Rel-v0 <https://github.com/isaac-sim/IsaacLab/blob/main/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/stack/config/ur10_gripper/stack_ik_rel_env_cfg.py>`__
.. |short-suction-link| replace:: `Isaac-Stack-Cube-UR10-Short-Suction-IK-Rel-v0 <https://github.com/isaac-sim/IsaacLab/blob/main/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/stack/config/ur10_gripper/stack_ik_rel_env_cfg.py>`__
.. |cube-shadow-link| replace:: `Isaac-Repose-Cube-Shadow-Direct-v0 <https://github.com/isaac-sim/IsaacLab/blob/main/source/isaaclab_tasks/isaaclab_tasks/direct/shadow_hand/shadow_hand_env_cfg.py>`__ .. |cube-shadow-link| replace:: `Isaac-Repose-Cube-Shadow-Direct-v0 <https://github.com/isaac-sim/IsaacLab/blob/main/source/isaaclab_tasks/isaaclab_tasks/direct/shadow_hand/shadow_hand_env_cfg.py>`__
.. |cube-shadow-ff-link| replace:: `Isaac-Repose-Cube-Shadow-OpenAI-FF-Direct-v0 <https://github.com/isaac-sim/IsaacLab/blob/main/source/isaaclab_tasks/isaaclab_tasks/direct/shadow_hand/shadow_hand_env_cfg.py>`__ .. |cube-shadow-ff-link| replace:: `Isaac-Repose-Cube-Shadow-OpenAI-FF-Direct-v0 <https://github.com/isaac-sim/IsaacLab/blob/main/source/isaaclab_tasks/isaaclab_tasks/direct/shadow_hand/shadow_hand_env_cfg.py>`__
...@@ -954,6 +962,18 @@ inferencing, including reading from an already trained checkpoint and disabling ...@@ -954,6 +962,18 @@ inferencing, including reading from an already trained checkpoint and disabling
- -
- Manager Based - Manager Based
- -
* - Isaac-Stack-Cube-Galbot-Left-Arm-Gripper-RmpFlow-v0
-
- Manager Based
-
* - Isaac-Stack-Cube-Galbot-Right-Arm-Suction-RmpFlow-v0
-
- Manager Based
-
* - Isaac-Stack-Cube-Galbot-Left-Arm-Gripper-Visuomotor-v0
- Isaac-Stack-Cube-Galbot-Left-Arm-Gripper-Visuomotor-Play-v0
- Manager Based
-
* - Isaac-Velocity-Flat-Anymal-B-v0 * - Isaac-Velocity-Flat-Anymal-B-v0
- Isaac-Velocity-Flat-Anymal-B-Play-v0 - Isaac-Velocity-Flat-Anymal-B-Play-v0
- Manager Based - Manager Based
......
[package] [package]
# Note: Semantic Versioning is used: https://semver.org/ # Note: Semantic Versioning is used: https://semver.org/
version = "0.45.14" version = "0.45.15"
# Description # Description
title = "Isaac Lab framework for Robot Learning" title = "Isaac Lab framework for Robot Learning"
......
Changelog Changelog
--------- ---------
0.45.15 (2025-09-05)
~~~~~~~~~~~~~~~~~~~~
Added
^^^^^
* Added action terms for using RMPFlow in Manager-Based environments.
0.45.14 (2025-09-08) 0.45.14 (2025-09-08)
~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~
Added Added
^^^^^ ^^^^^
* * Added :class:`~isaaclab.ui.xr_widgets.TeleopVisualizationManager` and :class:`~isaaclab.ui.xr_widgets.XRVisualization` * Added :class:`~isaaclab.ui.xr_widgets.TeleopVisualizationManager` and :class:`~isaaclab.ui.xr_widgets.XRVisualization`
classes to provide real-time visualization of teleoperation and inverse kinematics status in XR environments. classes to provide real-time visualization of teleoperation and inverse kinematics status in XR environments.
0.45.13 (2025-09-08) 0.45.13 (2025-09-08)
~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~
......
...@@ -8,6 +8,9 @@ import os ...@@ -8,6 +8,9 @@ import os
from isaacsim.core.utils.extensions import get_extension_path_from_name from isaacsim.core.utils.extensions import get_extension_path_from_name
from isaaclab.controllers.rmp_flow import RmpFlowControllerCfg from isaaclab.controllers.rmp_flow import RmpFlowControllerCfg
from isaaclab.utils.assets import ISAACLAB_NUCLEUS_DIR
ISAACLAB_NUCLEUS_RMPFLOW_DIR = os.path.join(ISAACLAB_NUCLEUS_DIR, "Controllers", "RmpFlowAssets")
# Note: RMP-Flow config files for supported robots are stored in the motion_generation extension # Note: RMP-Flow config files for supported robots are stored in the motion_generation extension
_RMP_CONFIG_DIR = os.path.join( _RMP_CONFIG_DIR = os.path.join(
...@@ -35,3 +38,37 @@ UR10_RMPFLOW_CFG = RmpFlowControllerCfg( ...@@ -35,3 +38,37 @@ UR10_RMPFLOW_CFG = RmpFlowControllerCfg(
evaluations_per_frame=5, evaluations_per_frame=5,
) )
"""Configuration of RMPFlow for UR10 arm (default from `isaacsim.robot_motion.motion_generation`).""" """Configuration of RMPFlow for UR10 arm (default from `isaacsim.robot_motion.motion_generation`)."""
GALBOT_LEFT_ARM_RMPFLOW_CFG = RmpFlowControllerCfg(
config_file=os.path.join(
ISAACLAB_NUCLEUS_RMPFLOW_DIR,
"galbot_one_charlie",
"rmpflow",
"galbot_one_charlie_left_arm_rmpflow_config.yaml",
),
urdf_file=os.path.join(ISAACLAB_NUCLEUS_RMPFLOW_DIR, "galbot_one_charlie", "galbot_one_charlie.urdf"),
collision_file=os.path.join(
ISAACLAB_NUCLEUS_RMPFLOW_DIR, "galbot_one_charlie", "rmpflow", "galbot_one_charlie_left_arm_gripper.yaml"
),
frame_name="left_gripper_tcp_link",
evaluations_per_frame=5,
ignore_robot_state_updates=True,
)
GALBOT_RIGHT_ARM_RMPFLOW_CFG = RmpFlowControllerCfg(
config_file=os.path.join(
ISAACLAB_NUCLEUS_RMPFLOW_DIR,
"galbot_one_charlie",
"rmpflow",
"galbot_one_charlie_right_arm_rmpflow_config.yaml",
),
urdf_file=os.path.join(ISAACLAB_NUCLEUS_RMPFLOW_DIR, "galbot_one_charlie", "galbot_one_charlie.urdf"),
collision_file=os.path.join(
ISAACLAB_NUCLEUS_RMPFLOW_DIR, "galbot_one_charlie", "rmpflow", "galbot_one_charlie_right_arm_suction.yaml"
),
frame_name="right_suction_cup_tcp_link",
evaluations_per_frame=5,
ignore_robot_state_updates=True,
)
"""Configuration of RMPFlow for Galbot humanoid."""
...@@ -20,6 +20,7 @@ from isaacsim.robot_motion.motion_generation import ArticulationMotionPolicy ...@@ -20,6 +20,7 @@ from isaacsim.robot_motion.motion_generation import ArticulationMotionPolicy
from isaacsim.robot_motion.motion_generation.lula.motion_policies import RmpFlow, RmpFlowSmoothed from isaacsim.robot_motion.motion_generation.lula.motion_policies import RmpFlow, RmpFlowSmoothed
from isaaclab.utils import configclass from isaaclab.utils import configclass
from isaaclab.utils.assets import retrieve_file_path
@configclass @configclass
...@@ -95,11 +96,17 @@ class RmpFlowController: ...@@ -95,11 +96,17 @@ class RmpFlowController:
# add robot reference # add robot reference
robot = SingleArticulation(prim_path) robot = SingleArticulation(prim_path)
robot.initialize() robot.initialize()
# download files if they are not local
local_urdf_file = retrieve_file_path(self.cfg.urdf_file, force_download=True)
local_collision_file = retrieve_file_path(self.cfg.collision_file, force_download=True)
local_config_file = retrieve_file_path(self.cfg.config_file, force_download=True)
# add controller # add controller
rmpflow = controller_cls( rmpflow = controller_cls(
robot_description_path=self.cfg.collision_file, robot_description_path=local_collision_file,
urdf_path=self.cfg.urdf_file, urdf_path=local_urdf_file,
rmpflow_config_path=self.cfg.config_file, rmpflow_config_path=local_config_file,
end_effector_frame_name=self.cfg.frame_name, end_effector_frame_name=self.cfg.frame_name,
maximum_substep_size=physics_dt / self.cfg.evaluations_per_frame, maximum_substep_size=physics_dt / self.cfg.evaluations_per_frame,
ignore_robot_state_updates=self.cfg.ignore_robot_state_updates, ignore_robot_state_updates=self.cfg.ignore_robot_state_updates,
......
...@@ -144,6 +144,7 @@ class Se3SpaceMouse(DeviceBase): ...@@ -144,6 +144,7 @@ class Se3SpaceMouse(DeviceBase):
if ( if (
device["product_string"] == "SpaceMouse Compact" device["product_string"] == "SpaceMouse Compact"
or device["product_string"] == "SpaceMouse Wireless" or device["product_string"] == "SpaceMouse Wireless"
or device["product_string"] == "3Dconnexion Universal Receiver"
): ):
# set found flag # set found flag
found = True found = True
...@@ -152,6 +153,7 @@ class Se3SpaceMouse(DeviceBase): ...@@ -152,6 +153,7 @@ class Se3SpaceMouse(DeviceBase):
# connect to the device # connect to the device
self._device.close() self._device.close()
self._device.open(vendor_id, product_id) self._device.open(vendor_id, product_id)
self._device_name = device["product_string"]
# check if device found # check if device found
if not found: if not found:
time.sleep(1.0) time.sleep(1.0)
...@@ -166,19 +168,32 @@ class Se3SpaceMouse(DeviceBase): ...@@ -166,19 +168,32 @@ class Se3SpaceMouse(DeviceBase):
# keep running # keep running
while True: while True:
# read the device data # read the device data
data = self._device.read(7) if self._device_name == "3Dconnexion Universal Receiver":
data = self._device.read(7 + 6)
else:
data = self._device.read(7)
if data is not None: if data is not None:
# readings from 6-DoF sensor # readings from 6-DoF sensor
if data[0] == 1: if self._device_name == "3Dconnexion Universal Receiver":
self._delta_pos[1] = self.pos_sensitivity * convert_buffer(data[1], data[2]) if data[0] == 1:
self._delta_pos[0] = self.pos_sensitivity * convert_buffer(data[3], data[4]) self._delta_pos[1] = self.pos_sensitivity * convert_buffer(data[1], data[2])
self._delta_pos[2] = self.pos_sensitivity * convert_buffer(data[5], data[6]) * -1.0 self._delta_pos[0] = self.pos_sensitivity * convert_buffer(data[3], data[4])
elif data[0] == 2 and not self._read_rotation: self._delta_pos[2] = self.pos_sensitivity * convert_buffer(data[5], data[6]) * -1.0
self._delta_rot[1] = self.rot_sensitivity * convert_buffer(data[1], data[2])
self._delta_rot[0] = self.rot_sensitivity * convert_buffer(data[3], data[4]) self._delta_rot[1] = self.rot_sensitivity * convert_buffer(data[1 + 6], data[2 + 6])
self._delta_rot[2] = self.rot_sensitivity * convert_buffer(data[5], data[6]) * -1.0 self._delta_rot[0] = self.rot_sensitivity * convert_buffer(data[3 + 6], data[4 + 6])
self._delta_rot[2] = self.rot_sensitivity * convert_buffer(data[5 + 6], data[6 + 6]) * -1.0
else:
if data[0] == 1:
self._delta_pos[1] = self.pos_sensitivity * convert_buffer(data[1], data[2])
self._delta_pos[0] = self.pos_sensitivity * convert_buffer(data[3], data[4])
self._delta_pos[2] = self.pos_sensitivity * convert_buffer(data[5], data[6]) * -1.0
elif data[0] == 2 and not self._read_rotation:
self._delta_rot[1] = self.rot_sensitivity * convert_buffer(data[1], data[2])
self._delta_rot[0] = self.rot_sensitivity * convert_buffer(data[3], data[4])
self._delta_rot[2] = self.rot_sensitivity * convert_buffer(data[5], data[6]) * -1.0
# readings from the side buttons # readings from the side buttons
elif data[0] == 3: if data[0] == 3:
# press left button # press left button
if data[1] == 1: if data[1] == 1:
# close gripper # close gripper
......
...@@ -10,3 +10,4 @@ from .binary_joint_actions import * ...@@ -10,3 +10,4 @@ from .binary_joint_actions import *
from .joint_actions import * from .joint_actions import *
from .joint_actions_to_limits import * from .joint_actions_to_limits import *
from .non_holonomic_actions import * from .non_holonomic_actions import *
from .surface_gripper_actions import *
...@@ -188,6 +188,41 @@ class BinaryJointVelocityActionCfg(BinaryJointActionCfg): ...@@ -188,6 +188,41 @@ class BinaryJointVelocityActionCfg(BinaryJointActionCfg):
class_type: type[ActionTerm] = binary_joint_actions.BinaryJointVelocityAction class_type: type[ActionTerm] = binary_joint_actions.BinaryJointVelocityAction
@configclass
class AbsBinaryJointPositionActionCfg(ActionTermCfg):
"""Configuration for the absolute binary joint position action term.
This action term is used for robust grasping by converting continuous gripper joint position actions
into binary open/close commands. Unlike directly applying continuous gripper joint position actions, this class
applies a threshold-based decision mechanism to determine whether to
open or close the gripper.
The action works by:
1. Taking a continuous input action value
2. Comparing it against a configurable threshold
3. Mapping the result to either open or close commands based on the threshold comparison
4. Applying the corresponding gripper open/close commands
This approach provides more predictable and stable grasping behavior compared to directly applying
continuous gripper joint position actions.
See :class:`AbsBinaryJointPositionAction` for more details.
"""
joint_names: list[str] = MISSING
"""List of joint names or regex expressions that the action will be mapped to."""
open_command_expr: dict[str, float] = MISSING
"""The joint command to move to *open* configuration."""
close_command_expr: dict[str, float] = MISSING
"""The joint command to move to *close* configuration."""
threshold: float = 0.5
"""The threshold for the binary action. Defaults to 0.5."""
positive_threshold: bool = True
"""Whether to use positive (Open actions > Close actions) threshold. Defaults to True."""
class_type: type[ActionTerm] = binary_joint_actions.AbsBinaryJointPositionAction
## ##
# Non-holonomic actions. # Non-holonomic actions.
## ##
......
...@@ -164,3 +164,47 @@ class BinaryJointVelocityAction(BinaryJointAction): ...@@ -164,3 +164,47 @@ class BinaryJointVelocityAction(BinaryJointAction):
def apply_actions(self): def apply_actions(self):
self._asset.set_joint_velocity_target(self._processed_actions, joint_ids=self._joint_ids) self._asset.set_joint_velocity_target(self._processed_actions, joint_ids=self._joint_ids)
class AbsBinaryJointPositionAction(BinaryJointAction):
"""Absolute Binary joint action that sets the binary action into joint position targets.
This class extends BinaryJointAction to accept absolute position control
for gripper joints. It converts continuous input actions into binary open/close commands
using a configurable threshold mechanism.
The key difference from the base BinaryJointAction is that this class:
- Receives absolute joint position actions for gripper control
- Implements a threshold-based decision system to determine open/close state
The action processing works by:
1. Taking a continuous input action value
2. Comparing it against the configured threshold value
3. Based on the threshold comparison and positive_threshold flag, determining
whether to open or close the gripper
4. Setting the target joint positions to either the open or close configuration
"""
cfg: actions_cfg.AbsBinaryJointPositionActionCfg
"""The configuration of the action term."""
def process_actions(self, actions: torch.Tensor):
# store the raw actions
self._raw_actions[:] = actions
# compute the binary mask
if self.cfg.positive_threshold:
# true: open 0.785, false: close 0.0
binary_mask = actions > self.cfg.threshold
else:
# true: close 0.0, false: open 0.785
binary_mask = actions < self.cfg.threshold
# compute the command
self._processed_actions = torch.where(binary_mask, self._open_command, self._close_command)
if self.cfg.clip is not None:
self._processed_actions = torch.clamp(
self._processed_actions, min=self._clip[:, :, 0], max=self._clip[:, :, 1]
)
def apply_actions(self):
self._asset.set_joint_position_target(self._processed_actions, joint_ids=self._joint_ids)
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from dataclasses import MISSING
from isaaclab.controllers.rmp_flow import RmpFlowControllerCfg
from isaaclab.managers.action_manager import ActionTerm, ActionTermCfg
from isaaclab.utils import configclass
from . import rmpflow_task_space_actions
@configclass
class RMPFlowActionCfg(ActionTermCfg):
@configclass
class OffsetCfg:
"""The offset pose from parent frame to child frame.
On many robots, end-effector frames are fictitious frames that do not have a corresponding
rigid body. In such cases, it is easier to define this transform w.r.t. their parent rigid body.
For instance, for the Franka Emika arm, the end-effector is defined at an offset to the the
"panda_hand" frame.
"""
pos: tuple[float, float, float] = (0.0, 0.0, 0.0)
"""Translation w.r.t. the parent frame. Defaults to (0.0, 0.0, 0.0)."""
rot: tuple[float, float, float, float] = (1.0, 0.0, 0.0, 0.0)
class_type: type[ActionTerm] = rmpflow_task_space_actions.RMPFlowAction
joint_names: list[str] = MISSING
"""List of joint names or regex expressions that the action will be mapped to."""
body_name: str = MISSING
"""Name of the body or frame for which IK is performed."""
body_offset: OffsetCfg | None = None
"""Offset of target frame w.r.t. to the body frame. Defaults to None, in which case no offset is applied."""
scale: float | tuple[float, ...] = 1.0
controller: RmpFlowControllerCfg = MISSING
articulation_prim_expr: str = MISSING # The expression to find the articulation prim paths.
"""The configuration for the RMPFlow controller."""
use_relative_mode: bool = False
"""
Defaults to False.
If True, then the controller treats the input command as a delta change in the position/pose.
Otherwise, the controller treats the input command as the absolute position/pose.
"""
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import torch
from collections.abc import Sequence
from typing import TYPE_CHECKING
import omni.log
import isaaclab.utils.math as math_utils
import isaaclab.utils.string as string_utils
from isaaclab.assets.articulation import Articulation
from isaaclab.controllers.rmp_flow import RmpFlowController
from isaaclab.managers.action_manager import ActionTerm
if TYPE_CHECKING:
from isaaclab.envs import ManagerBasedEnv
from . import rmpflow_actions_cfg
class RMPFlowAction(ActionTerm):
cfg: rmpflow_actions_cfg.RMPFlowActionCfg
"""The configuration of the action term."""
_asset: Articulation
"""The articulation asset on which the action term is applied."""
_scale: torch.Tensor
"""The scaling factor applied to the input action. Shape is (1, action_dim)."""
_clip: torch.Tensor
"""The clip applied to the input action."""
def __init__(self, cfg: rmpflow_actions_cfg.RMPFlowActionCfg, env: ManagerBasedEnv):
# initialize the action term
super().__init__(cfg, env)
# resolve the joints over which the action term is applied
self._joint_ids, self._joint_names = self._asset.find_joints(self.cfg.joint_names)
self._num_joints = len(self._joint_ids)
# parse the body index
body_ids, body_names = self._asset.find_bodies(self.cfg.body_name)
if len(body_ids) != 1:
raise ValueError(
f"Expected one match for the body name: {self.cfg.body_name}. Found {len(body_ids)}: {body_names}."
)
# save only the first body index
self._body_idx = body_ids[0]
self._body_name = body_names[0]
# check if articulation is fixed-base
# if fixed-base then the jacobian for the base is not computed
# this means that number of bodies is one less than the articulation's number of bodies
if self._asset.is_fixed_base:
self._jacobi_body_idx = self._body_idx - 1
self._jacobi_joint_ids = self._joint_ids
else:
self._jacobi_body_idx = self._body_idx
self._jacobi_joint_ids = [i + 6 for i in self._joint_ids]
# log info for debugging
omni.log.info(
f"Resolved joint names for the action term {self.__class__.__name__}:"
f" {self._joint_names} [{self._joint_ids}]"
)
omni.log.info(
f"Resolved body name for the action term {self.__class__.__name__}: {self._body_name} [{self._body_idx}]"
)
# Avoid indexing across all joints for efficiency
if self._num_joints == self._asset.num_joints:
self._joint_ids = slice(None)
# create the differential IK controller
self._rmpflow_controller = RmpFlowController(cfg=self.cfg.controller, device=self.device)
# create tensors for raw and processed actions
self._raw_actions = torch.zeros(self.num_envs, self.action_dim, device=self.device)
self._processed_actions = torch.zeros_like(self.raw_actions)
# save the scale as tensors
self._scale = torch.zeros((self.num_envs, self.action_dim), device=self.device)
self._scale[:] = torch.tensor(self.cfg.scale, device=self.device)
# convert the fixed offsets to torch tensors of batched shape
if self.cfg.body_offset is not None:
self._offset_pos = torch.tensor(self.cfg.body_offset.pos, device=self.device).repeat(self.num_envs, 1)
self._offset_rot = torch.tensor(self.cfg.body_offset.rot, device=self.device).repeat(self.num_envs, 1)
else:
self._offset_pos, self._offset_rot = None, None
# parse clip
if self.cfg.clip is not None:
if isinstance(cfg.clip, dict):
self._clip = torch.tensor([[-float("inf"), float("inf")]], device=self.device).repeat(
self.num_envs, self.action_dim, 1
)
index_list, _, value_list = string_utils.resolve_matching_names_values(self.cfg.clip, self._joint_names)
self._clip[:, index_list] = torch.tensor(value_list, device=self.device)
else:
raise ValueError(f"Unsupported clip type: {type(cfg.clip)}. Supported types are dict.")
"""
Properties.
"""
@property
def action_dim(self) -> int:
if self.cfg.use_relative_mode:
return 6 # delta_eef_xyz, delta_eef_rpy
else:
return 7 # absolute_eef_xyz, absolute_eef_quat
# self._rmpflow_controller.num_actions = 7 since it use quaternions (w,x,y,z) as command
@property
def raw_actions(self) -> torch.Tensor:
return self._raw_actions
@property
def processed_actions(self) -> torch.Tensor:
return self._processed_actions
@property
def jacobian_w(self) -> torch.Tensor:
return self._asset.root_physx_view.get_jacobians()[:, self._jacobi_body_idx, :, self._jacobi_joint_ids]
@property
def jacobian_b(self) -> torch.Tensor:
jacobian = self.jacobian_w
base_rot = self._asset.data.root_quat_w
base_rot_matrix = math_utils.matrix_from_quat(math_utils.quat_inv(base_rot))
jacobian[:, :3, :] = torch.bmm(base_rot_matrix, jacobian[:, :3, :])
jacobian[:, 3:, :] = torch.bmm(base_rot_matrix, jacobian[:, 3:, :])
return jacobian
"""
Operations.
"""
# This is called each env.step()
def process_actions(self, actions: torch.Tensor):
# store the raw actions
self._raw_actions[:] = actions
self._processed_actions[:] = self.raw_actions * self._scale
if self.cfg.clip is not None:
self._processed_actions = torch.clamp(
self._processed_actions, min=self._clip[:, :, 0], max=self._clip[:, :, 1]
)
# If use_relative_mode is True, then the controller will apply delta change to the current ee_pose.
if self.cfg.use_relative_mode:
# obtain quantities from simulation
ee_pos_curr, ee_quat_curr = self._compute_frame_pose()
# compute ee_pose_targets use_relative_actions
if ee_pos_curr is None or ee_quat_curr is None:
raise ValueError(
"Neither end-effector position nor orientation can be None for `pose_rel` command type!"
)
self.ee_pos_des, self.ee_quat_des = math_utils.apply_delta_pose(
ee_pos_curr, ee_quat_curr, self._processed_actions
)
else: # If use_relative_mode is False, then the controller will apply absolute ee_pose.
self.ee_pos_des = self._processed_actions[:, 0:3]
self.ee_quat_des = self._processed_actions[:, 3:7]
self.ee_pose_des = torch.cat([self.ee_pos_des, self.ee_quat_des], dim=1) # shape: [n, 7]
# set command into controller
self._rmpflow_controller.set_command(self.ee_pose_des)
# This is called each simulationcontext.step(), step *decimation* times when env.step() update actions
def apply_actions(self):
# obtain quantities from simulation
ee_pos_curr, ee_quat_curr = self._compute_frame_pose()
joint_pos = self._asset.data.joint_pos[:, self._joint_ids]
# compute the delta in joint-space
if ee_quat_curr.norm() != 0:
joint_pos_des, joint_vel_des = self._rmpflow_controller.compute()
else:
joint_pos_des = joint_pos.clone()
# set the joint position command
self._asset.set_joint_position_target(joint_pos_des, self._joint_ids)
self._asset.set_joint_velocity_target(joint_vel_des, self._joint_ids)
def reset(self, env_ids: Sequence[int] | None = None) -> None:
self._raw_actions[env_ids] = 0.0
self._rmpflow_controller.initialize(self.cfg.articulation_prim_expr)
"""
Helper functions.
"""
def _compute_frame_pose(self) -> tuple[torch.Tensor, torch.Tensor]:
"""Computes the pose of the target frame in the root frame.
Returns:
A tuple of the body's position and orientation in the root frame.
"""
# obtain quantities from simulation
ee_pos_w = self._asset.data.body_pos_w[:, self._body_idx]
ee_quat_w = self._asset.data.body_quat_w[:, self._body_idx]
root_pos_w = self._asset.data.root_pos_w
root_quat_w = self._asset.data.root_quat_w
# compute the pose of the body in the root frame
ee_pose_b, ee_quat_b = math_utils.subtract_frame_transforms(root_pos_w, root_quat_w, ee_pos_w, ee_quat_w)
# account for the offset
if self.cfg.body_offset is not None:
ee_pose_b, ee_quat_b = math_utils.combine_frame_transforms(
ee_pose_b, ee_quat_b, self._offset_pos, self._offset_rot
)
return ee_pose_b, ee_quat_b
...@@ -14,6 +14,7 @@ from .cart_double_pendulum import * ...@@ -14,6 +14,7 @@ from .cart_double_pendulum import *
from .cartpole import * from .cartpole import *
from .fourier import * from .fourier import *
from .franka import * from .franka import *
from .galbot import *
from .humanoid import * from .humanoid import *
from .humanoid_28 import * from .humanoid_28 import *
from .kinova import * from .kinova import *
......
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Configuration for the Galbot humanoid robot.
The following configuration parameters are available:
* :obj:`GALBOT_ONE_CHARLIE_CFG`: The galbot_one_charlie humanoid robot.
"""
import isaaclab.sim as sim_utils
from isaaclab.actuators import ImplicitActuatorCfg
from isaaclab.assets.articulation import ArticulationCfg
from isaaclab.utils.assets import ISAACLAB_NUCLEUS_DIR
##
# Configuration
##
GALBOT_ONE_CHARLIE_CFG = ArticulationCfg(
spawn=sim_utils.UsdFileCfg(
usd_path=f"{ISAACLAB_NUCLEUS_DIR}/Robots/Galbot/galbot_one_charlie/galbot_one_charlie.usd",
variants={"Physics": "PhysX"},
rigid_props=sim_utils.RigidBodyPropertiesCfg(
disable_gravity=True,
max_depenetration_velocity=5.0,
),
activate_contact_sensors=True,
),
init_state=ArticulationCfg.InitialStateCfg(
joint_pos={
"leg_joint1": 0.8,
"leg_joint2": 2.3,
"leg_joint3": 1.55,
"leg_joint4": 0.0,
"head_joint1": 0.0,
"head_joint2": 0.36,
"left_arm_joint1": -0.5480,
"left_arm_joint2": -0.6551,
"left_arm_joint3": 2.407,
"left_arm_joint4": 1.3641,
"left_arm_joint5": -0.4416,
"left_arm_joint6": 0.1168,
"left_arm_joint7": 1.2308,
"left_gripper_left_joint": 0.035,
"left_gripper_right_joint": 0.035,
"right_arm_joint1": 0.1535,
"right_arm_joint2": 1.0087,
"right_arm_joint3": 0.0895,
"right_arm_joint4": 1.5743,
"right_arm_joint5": -0.2422,
"right_arm_joint6": -0.0009,
"right_arm_joint7": -0.9143,
"right_suction_cup_joint1": 0.0,
},
pos=(-0.6, 0.0, -0.8),
),
# PD parameters are read from USD file with Gain Tuner
actuators={
"head": ImplicitActuatorCfg(
joint_names_expr=["head_joint.*"],
velocity_limit_sim=None,
effort_limit_sim=None,
stiffness=None,
damping=None,
),
"leg": ImplicitActuatorCfg(
joint_names_expr=["leg_joint.*"],
velocity_limit_sim=None,
effort_limit_sim=None,
stiffness=None,
damping=None,
),
"left_arm": ImplicitActuatorCfg(
joint_names_expr=["left_arm_joint.*"],
velocity_limit_sim=None,
effort_limit_sim=None,
stiffness=None,
damping=None,
),
"right_arm": ImplicitActuatorCfg(
joint_names_expr=["right_arm_joint.*", "right_suction_cup_joint1"],
velocity_limit_sim=None,
effort_limit_sim=None,
stiffness=None,
damping=None,
),
"left_gripper": ImplicitActuatorCfg(
joint_names_expr=["left_gripper_.*_joint"],
velocity_limit_sim=1.0,
effort_limit_sim=None,
stiffness=None,
damping=None,
),
},
)
"""Configuration of Galbot_one_charlie humanoid using implicit actuator models."""
...@@ -14,6 +14,13 @@ from .franka_stack_ik_rel_mimic_env import FrankaCubeStackIKRelMimicEnv ...@@ -14,6 +14,13 @@ from .franka_stack_ik_rel_mimic_env import FrankaCubeStackIKRelMimicEnv
from .franka_stack_ik_rel_mimic_env_cfg import FrankaCubeStackIKRelMimicEnvCfg from .franka_stack_ik_rel_mimic_env_cfg import FrankaCubeStackIKRelMimicEnvCfg
from .franka_stack_ik_rel_visuomotor_cosmos_mimic_env_cfg import FrankaCubeStackIKRelVisuomotorCosmosMimicEnvCfg from .franka_stack_ik_rel_visuomotor_cosmos_mimic_env_cfg import FrankaCubeStackIKRelVisuomotorCosmosMimicEnvCfg
from .franka_stack_ik_rel_visuomotor_mimic_env_cfg import FrankaCubeStackIKRelVisuomotorMimicEnvCfg from .franka_stack_ik_rel_visuomotor_mimic_env_cfg import FrankaCubeStackIKRelVisuomotorMimicEnvCfg
from .galbot_stack_rmp_abs_mimic_env import RmpFlowGalbotCubeStackAbsMimicEnv
from .galbot_stack_rmp_abs_mimic_env_cfg import (
RmpFlowGalbotLeftArmGripperCubeStackAbsMimicEnvCfg,
RmpFlowGalbotRightArmSuctionCubeStackAbsMimicEnvCfg,
)
from .galbot_stack_rmp_rel_mimic_env import RmpFlowGalbotCubeStackRelMimicEnv
from .galbot_stack_rmp_rel_mimic_env_cfg import RmpFlowGalbotLeftArmGripperCubeStackRelMimicEnvCfg
## ##
# Inverse Kinematics - Relative Pose Control # Inverse Kinematics - Relative Pose Control
...@@ -65,3 +72,47 @@ gym.register( ...@@ -65,3 +72,47 @@ gym.register(
}, },
disable_env_checker=True, disable_env_checker=True,
) )
##
# Galbot Stack Cube with RmpFlow - Relative Pose Control
##
gym.register(
id="Isaac-Stack-Cube-Galbot-Left-Arm-Gripper-RmpFlow-Rel-Mimic-v0",
entry_point="isaaclab_mimic.envs:RmpFlowGalbotCubeStackRelMimicEnv",
kwargs={
"env_cfg_entry_point": galbot_stack_rmp_rel_mimic_env_cfg.RmpFlowGalbotLeftArmGripperCubeStackRelMimicEnvCfg,
},
disable_env_checker=True,
)
gym.register(
id="Isaac-Stack-Cube-Galbot-Right-Arm-Suction-RmpFlow-Rel-Mimic-v0",
entry_point="isaaclab_mimic.envs:RmpFlowGalbotCubeStackRelMimicEnv",
kwargs={
"env_cfg_entry_point": galbot_stack_rmp_rel_mimic_env_cfg.RmpFlowGalbotRightArmSuctionCubeStackRelMimicEnvCfg,
},
disable_env_checker=True,
)
##
# Galbot Stack Cube with RmpFlow - Absolute Pose Control
##
gym.register(
id="Isaac-Stack-Cube-Galbot-Left-Arm-Gripper-RmpFlow-Abs-Mimic-v0",
entry_point="isaaclab_mimic.envs:RmpFlowGalbotCubeStackAbsMimicEnv",
kwargs={
"env_cfg_entry_point": galbot_stack_rmp_abs_mimic_env_cfg.RmpFlowGalbotLeftArmGripperCubeStackAbsMimicEnvCfg,
},
disable_env_checker=True,
)
gym.register(
id="Isaac-Stack-Cube-Galbot-Right-Arm-Suction-RmpFlow-Abs-Mimic-v0",
entry_point="isaaclab_mimic.envs:RmpFlowGalbotCubeStackAbsMimicEnv",
kwargs={
"env_cfg_entry_point": galbot_stack_rmp_abs_mimic_env_cfg.RmpFlowGalbotRightArmSuctionCubeStackAbsMimicEnvCfg,
},
disable_env_checker=True,
)
...@@ -80,7 +80,7 @@ class FrankaCubeStackIKRelMimicEnv(ManagerBasedRLMimicEnv): ...@@ -80,7 +80,7 @@ class FrankaCubeStackIKRelMimicEnv(ManagerBasedRLMimicEnv):
# add noise to action # add noise to action
pose_action = torch.cat([delta_position, delta_rotation], dim=0) pose_action = torch.cat([delta_position, delta_rotation], dim=0)
if action_noise_dict is not None: if action_noise_dict is not None:
noise = action_noise_dict["franka"] * torch.randn_like(pose_action) noise = action_noise_dict[eef_name] * torch.randn_like(pose_action)
pose_action += noise pose_action += noise
pose_action = torch.clamp(pose_action, -1.0, 1.0) pose_action = torch.clamp(pose_action, -1.0, 1.0)
......
# Copyright (c) 2024-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
from collections.abc import Sequence
import isaaclab.utils.math as PoseUtils
from .franka_stack_ik_abs_mimic_env import FrankaCubeStackIKAbsMimicEnv
class RmpFlowGalbotCubeStackAbsMimicEnv(FrankaCubeStackIKAbsMimicEnv):
"""
Isaac Lab Mimic environment wrapper class for Galbot Cube Stack RmpFlow Absolute env.
"""
def get_object_poses(self, env_ids: Sequence[int] | None = None):
"""
Rewrite this function to get the pose of each object in robot base frame,
relevant to Isaac Lab Mimic data generation in the current scene.
Args:
env_ids: Environment indices to get the pose for. If None, all envs are considered.
Returns:
A dictionary that maps object names to object pose matrix in base frame of robot (4x4 torch.Tensor)
"""
if env_ids is None:
env_ids = slice(None)
rigid_object_states = self.scene.get_state(is_relative=True)["rigid_object"]
robot_states = self.scene.get_state(is_relative=True)["articulation"]["robot"]
root_pose = robot_states["root_pose"]
root_pos = root_pose[env_ids, :3]
root_quat = root_pose[env_ids, 3:7]
object_pose_matrix = dict()
for obj_name, obj_state in rigid_object_states.items():
pos_obj_base, quat_obj_base = PoseUtils.subtract_frame_transforms(
root_pos, root_quat, obj_state["root_pose"][env_ids, :3], obj_state["root_pose"][env_ids, 3:7]
)
rot_obj_base = PoseUtils.matrix_from_quat(quat_obj_base)
object_pose_matrix[obj_name] = PoseUtils.make_pose(pos_obj_base, rot_obj_base)
return object_pose_matrix
# Copyright (c) 2024-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
from isaaclab.envs.mimic_env_cfg import MimicEnvCfg, SubTaskConfig
from isaaclab.utils import configclass
from isaaclab_tasks.manager_based.manipulation.stack.config.galbot.stack_rmp_rel_env_cfg import (
RmpFlowGalbotLeftArmCubeStackEnvCfg,
RmpFlowGalbotRightArmCubeStackEnvCfg,
)
@configclass
class RmpFlowGalbotLeftArmGripperCubeStackAbsMimicEnvCfg(RmpFlowGalbotLeftArmCubeStackEnvCfg, MimicEnvCfg):
"""
Isaac Lab Mimic environment config class for Galbot Gripper Cube Stack IK Rel env.
"""
def __post_init__(self):
# post init of parents
super().__post_init__()
# Override the existing values
self.datagen_config.name = "demo_src_stack_isaac_lab_task_D0"
self.datagen_config.generation_guarantee = True
self.datagen_config.generation_keep_failed = True
self.datagen_config.generation_num_trials = 10
self.datagen_config.generation_select_src_per_subtask = True
self.datagen_config.generation_transform_first_robot_pose = False
self.datagen_config.generation_interpolate_from_last_target_pose = True
self.datagen_config.max_num_failures = 25
self.datagen_config.seed = 1
# The following are the subtask configurations for the stack task.
subtask_configs = []
subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_2",
# This key corresponds to the binary indicator in "datagen_info" that signals
# when this subtask is finished (e.g., on a 0 to 1 edge).
subtask_term_signal="grasp_1",
# Specifies time offsets for data generation when splitting a trajectory into
# subtask segments. Random offsets are added to the termination boundary.
subtask_term_offset_range=(
18,
25,
),
# Selection strategy for the source subtask segment during data generation
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.01,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=15,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_1",
# Corresponding key for the binary indicator in "datagen_info" for completion
subtask_term_signal="stack_1",
# Time offsets for data generation when splitting a trajectory
subtask_term_offset_range=(
18,
25,
),
# Selection strategy for source subtask segment
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.01,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=15,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_3",
# Corresponding key for the binary indicator in "datagen_info" for completion
subtask_term_signal="grasp_2",
# Time offsets for data generation when splitting a trajectory
subtask_term_offset_range=(
25,
30,
), # this should be larger than the other subtasks, because the gripper should be lifted higher than 2 blocks
# Selection strategy for source subtask segment
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.01,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=15,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_2",
# End of final subtask does not need to be detected
subtask_term_signal=None,
# No time offsets for the final subtask
subtask_term_offset_range=(0, 0),
# Selection strategy for source subtask segment
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.01,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=15,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
self.subtask_configs["galbot"] = subtask_configs
@configclass
class RmpFlowGalbotRightArmSuctionCubeStackAbsMimicEnvCfg(RmpFlowGalbotRightArmCubeStackEnvCfg, MimicEnvCfg):
"""
Isaac Lab Mimic environment config class for Galbot Suction Gripper Cube Stack RmpFlow Abs env.
"""
def __post_init__(self):
# post init of parents
super().__post_init__()
# Override the existing values
self.datagen_config.name = "demo_src_stack_isaac_lab_task_D0"
self.datagen_config.generation_guarantee = True
self.datagen_config.generation_keep_failed = True
self.datagen_config.generation_num_trials = 10
self.datagen_config.generation_select_src_per_subtask = True
self.datagen_config.generation_transform_first_robot_pose = False
self.datagen_config.generation_interpolate_from_last_target_pose = True
self.datagen_config.max_num_failures = 25
self.datagen_config.seed = 1
# The following are the subtask configurations for the stack task.
subtask_configs = []
subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_2",
# This key corresponds to the binary indicator in "datagen_info" that signals
# when this subtask is finished (e.g., on a 0 to 1 edge).
subtask_term_signal="grasp_1",
# Specifies time offsets for data generation when splitting a trajectory into
# subtask segments. Random offsets are added to the termination boundary.
subtask_term_offset_range=(5, 10),
# Selection strategy for the source subtask segment during data generation
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.01,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=15,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_1",
# Corresponding key for the binary indicator in "datagen_info" for completion
subtask_term_signal="stack_1",
# Time offsets for data generation when splitting a trajectory
subtask_term_offset_range=(2, 10),
# Selection strategy for source subtask segment
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.01,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=15,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_3",
# Corresponding key for the binary indicator in "datagen_info" for completion
subtask_term_signal="grasp_2",
# Time offsets for data generation when splitting a trajectory
subtask_term_offset_range=(5, 10),
# Selection strategy for source subtask segment
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.01,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=15,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_2",
# End of final subtask does not need to be detected
subtask_term_signal=None,
# No time offsets for the final subtask
subtask_term_offset_range=(0, 0),
# Selection strategy for source subtask segment
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.01,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=15,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
self.subtask_configs["galbot"] = subtask_configs
# Copyright (c) 2024-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
from collections.abc import Sequence
import isaaclab.utils.math as PoseUtils
from .franka_stack_ik_rel_mimic_env import FrankaCubeStackIKRelMimicEnv
class RmpFlowGalbotCubeStackRelMimicEnv(FrankaCubeStackIKRelMimicEnv):
"""
Isaac Lab Mimic environment wrapper class for Galbot Cube Stack RmpFlow Rel env.
"""
def get_object_poses(self, env_ids: Sequence[int] | None = None):
"""
Rewrite this function to get the pose of each object in robot base frame,
relevant to Isaac Lab Mimic data generation in the current scene.
Args:
env_ids: Environment indices to get the pose for. If None, all envs are considered.
Returns:
A dictionary that maps object names to object pose matrix in base frame of robot (4x4 torch.Tensor)
"""
if env_ids is None:
env_ids = slice(None)
rigid_object_states = self.scene.get_state(is_relative=True)["rigid_object"]
robot_states = self.scene.get_state(is_relative=True)["articulation"]["robot"]
root_pose = robot_states["root_pose"]
root_pos = root_pose[env_ids, :3]
root_quat = root_pose[env_ids, 3:7]
object_pose_matrix = dict()
for obj_name, obj_state in rigid_object_states.items():
pos_obj_base, quat_obj_base = PoseUtils.subtract_frame_transforms(
root_pos, root_quat, obj_state["root_pose"][env_ids, :3], obj_state["root_pose"][env_ids, 3:7]
)
rot_obj_base = PoseUtils.matrix_from_quat(quat_obj_base)
object_pose_matrix[obj_name] = PoseUtils.make_pose(pos_obj_base, rot_obj_base)
return object_pose_matrix
# Copyright (c) 2024-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
from isaaclab.envs.mimic_env_cfg import MimicEnvCfg, SubTaskConfig
from isaaclab.utils import configclass
from isaaclab_tasks.manager_based.manipulation.stack.config.galbot.stack_rmp_rel_env_cfg import (
RmpFlowGalbotLeftArmCubeStackEnvCfg,
RmpFlowGalbotRightArmCubeStackEnvCfg,
)
@configclass
class RmpFlowGalbotLeftArmGripperCubeStackRelMimicEnvCfg(RmpFlowGalbotLeftArmCubeStackEnvCfg, MimicEnvCfg):
"""
Isaac Lab Mimic environment config class for Galbot Gripper Cube Stack IK Rel env.
"""
def __post_init__(self):
# post init of parents
super().__post_init__()
# Override the existing values
self.datagen_config.name = "demo_src_stack_isaac_lab_task_D0"
self.datagen_config.generation_guarantee = True
self.datagen_config.generation_keep_failed = True
self.datagen_config.generation_num_trials = 10
self.datagen_config.generation_select_src_per_subtask = True
self.datagen_config.generation_transform_first_robot_pose = False
self.datagen_config.generation_interpolate_from_last_target_pose = True
self.datagen_config.max_num_failures = 25
self.datagen_config.seed = 1
# The following are the subtask configurations for the stack task.
subtask_configs = []
subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_2",
# This key corresponds to the binary indicator in "datagen_info" that signals
# when this subtask is finished (e.g., on a 0 to 1 edge).
subtask_term_signal="grasp_1",
# Specifies time offsets for data generation when splitting a trajectory into
# subtask segments. Random offsets are added to the termination boundary.
subtask_term_offset_range=(
18,
25,
),
# Selection strategy for the source subtask segment during data generation
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.01,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=15,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_1",
# Corresponding key for the binary indicator in "datagen_info" for completion
subtask_term_signal="stack_1",
# Time offsets for data generation when splitting a trajectory
subtask_term_offset_range=(
18,
25,
),
# Selection strategy for source subtask segment
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.01,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=15,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_3",
# Corresponding key for the binary indicator in "datagen_info" for completion
subtask_term_signal="grasp_2",
# Time offsets for data generation when splitting a trajectory
subtask_term_offset_range=(
25,
30,
), # this should be larger than the other subtasks, because the gripper should be lifted higher than 2 blocks
# Selection strategy for source subtask segment
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.01,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=15,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_2",
# End of final subtask does not need to be detected
subtask_term_signal=None,
# No time offsets for the final subtask
subtask_term_offset_range=(0, 0),
# Selection strategy for source subtask segment
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.01,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=15,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
self.subtask_configs["galbot"] = subtask_configs
@configclass
class RmpFlowGalbotRightArmSuctionCubeStackRelMimicEnvCfg(RmpFlowGalbotRightArmCubeStackEnvCfg, MimicEnvCfg):
"""
Isaac Lab Mimic environment config class for Galbot Suction Gripper Cube Stack RmpFlow Rel env.
"""
def __post_init__(self):
# post init of parents
super().__post_init__()
# Override the existing values
self.datagen_config.name = "demo_src_stack_isaac_lab_task_D0"
self.datagen_config.generation_guarantee = True
self.datagen_config.generation_keep_failed = True
self.datagen_config.generation_num_trials = 10
self.datagen_config.generation_select_src_per_subtask = True
self.datagen_config.generation_transform_first_robot_pose = False
self.datagen_config.generation_interpolate_from_last_target_pose = True
self.datagen_config.max_num_failures = 25
self.datagen_config.seed = 1
# The following are the subtask configurations for the stack task.
subtask_configs = []
subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_2",
# This key corresponds to the binary indicator in "datagen_info" that signals
# when this subtask is finished (e.g., on a 0 to 1 edge).
subtask_term_signal="grasp_1",
# Specifies time offsets for data generation when splitting a trajectory into
# subtask segments. Random offsets are added to the termination boundary.
subtask_term_offset_range=(5, 10),
# Selection strategy for the source subtask segment during data generation
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.01,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=15,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_1",
# Corresponding key for the binary indicator in "datagen_info" for completion
subtask_term_signal="stack_1",
# Time offsets for data generation when splitting a trajectory
subtask_term_offset_range=(2, 10),
# Selection strategy for source subtask segment
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.01,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=15,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_3",
# Corresponding key for the binary indicator in "datagen_info" for completion
subtask_term_signal="grasp_2",
# Time offsets for data generation when splitting a trajectory
subtask_term_offset_range=(5, 10),
# Selection strategy for source subtask segment
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.01,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=15,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_2",
# End of final subtask does not need to be detected
subtask_term_signal=None,
# No time offsets for the final subtask
subtask_term_offset_range=(0, 0),
# Selection strategy for source subtask segment
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.01,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=15,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
self.subtask_configs["galbot"] = subtask_configs
[package] [package]
# Note: Semantic Versioning is used: https://semver.org/ # Note: Semantic Versioning is used: https://semver.org/
version = "0.10.49" version = "0.10.50"
# Description # Description
title = "Isaac Lab Environments" title = "Isaac Lab Environments"
......
Changelog Changelog
--------- ---------
0.10.50 (2025-09-05)
~~~~~~~~~~~~~~~~~~~~
Added
^^^^^
* Added stacking environments for Galbot with suction grippers.
0.10.49 (2025-09-05) 0.10.49 (2025-09-05)
~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~
......
...@@ -86,6 +86,9 @@ class FrankaCubeStackEnvCfg(StackEnvCfg): ...@@ -86,6 +86,9 @@ class FrankaCubeStackEnvCfg(StackEnvCfg):
open_command_expr={"panda_finger_.*": 0.04}, open_command_expr={"panda_finger_.*": 0.04},
close_command_expr={"panda_finger_.*": 0.0}, close_command_expr={"panda_finger_.*": 0.0},
) )
self.gripper_joint_names = ["panda_finger_.*"]
self.gripper_open_val = 0.04
self.gripper_threshold = 0.005
# Rigid body properties of each cube # Rigid body properties of each cube
cube_properties = RigidBodyPropertiesCfg( cube_properties = RigidBodyPropertiesCfg(
......
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import gymnasium as gym
import os
from . import stack_rmp_rel_env_cfg
##
# Register Gym environments.
##
##
# RMPFlow (with Joint Limit Constraint and Obstacle Avoidance) for Galbot Single Arm Cube Stack Task
# you can use for both absolute and relative mode, by given the USE_RELATIVE_MODE environment variable
##
gym.register(
id="Isaac-Stack-Cube-Galbot-Left-Arm-Gripper-RmpFlow-v0",
entry_point="isaaclab.envs:ManagerBasedRLEnv",
kwargs={
"env_cfg_entry_point": stack_rmp_rel_env_cfg.RmpFlowGalbotLeftArmCubeStackEnvCfg,
},
disable_env_checker=True,
)
gym.register(
id="Isaac-Stack-Cube-Galbot-Right-Arm-Suction-RmpFlow-v0",
entry_point="isaaclab.envs:ManagerBasedRLEnv",
kwargs={
"env_cfg_entry_point": stack_rmp_rel_env_cfg.RmpFlowGalbotRightArmCubeStackEnvCfg,
},
disable_env_checker=True,
)
##
# Visuomotor Task for Galbot Left ArmCube Stack Task
##
gym.register(
id="Isaac-Stack-Cube-Galbot-Left-Arm-Gripper-Visuomotor-v0",
entry_point="isaaclab.envs:ManagerBasedRLEnv",
kwargs={
"env_cfg_entry_point": stack_rmp_rel_env_cfg.RmpFlowGalbotLeftArmCubeStackVisuomotorEnvCfg,
},
disable_env_checker=True,
)
##
# Policy Close-loop Evaluation Task for Galbot Left Arm Cube Stack Task (in Joint Space)
##
gym.register(
id="Isaac-Stack-Cube-Galbot-Left-Arm-Gripper-Visuomotor-Joint-Position-Play-v0",
entry_point="isaaclab.envs:ManagerBasedRLEnv",
kwargs={
"env_cfg_entry_point": stack_rmp_rel_env_cfg.GalbotLeftArmJointPositionCubeStackVisuomotorEnvCfg_PLAY,
},
disable_env_checker=True,
)
##
# Policy Close-loop Evaluation Task for Galbot Left Arm Cube Stack Task (in Task Space)
##
gym.register(
id="Isaac-Stack-Cube-Galbot-Left-Arm-Gripper-Visuomotor-RmpFlow-Play-v0",
entry_point="isaaclab.envs:ManagerBasedRLEnv",
kwargs={
"env_cfg_entry_point": stack_rmp_rel_env_cfg.GalbotLeftArmRmpFlowCubeStackVisuomotorEnvCfg_PLAY,
},
disable_env_checker=True,
)
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from isaaclab.assets import RigidObjectCfg, SurfaceGripperCfg
from isaaclab.envs.mdp.actions.actions_cfg import SurfaceGripperBinaryActionCfg
from isaaclab.managers import EventTermCfg as EventTerm
from isaaclab.managers import ObservationGroupCfg as ObsGroup
from isaaclab.managers import ObservationTermCfg as ObsTerm
from isaaclab.managers import SceneEntityCfg
from isaaclab.sensors import FrameTransformerCfg
from isaaclab.sensors.frame_transformer.frame_transformer_cfg import OffsetCfg
from isaaclab.sim.schemas.schemas_cfg import RigidBodyPropertiesCfg
from isaaclab.sim.spawners.from_files.from_files_cfg import UsdFileCfg
from isaaclab.utils import configclass
from isaaclab.utils.assets import ISAAC_NUCLEUS_DIR
from isaaclab_tasks.manager_based.manipulation.stack import mdp
from isaaclab_tasks.manager_based.manipulation.stack.mdp import franka_stack_events
from isaaclab_tasks.manager_based.manipulation.stack.stack_env_cfg import ObservationsCfg, StackEnvCfg
##
# Pre-defined configs
##
from isaaclab.markers.config import FRAME_MARKER_CFG # isort: skip
from isaaclab_assets.robots.galbot import GALBOT_ONE_CHARLIE_CFG # isort: skip
@configclass
class EventCfg:
"""Configuration for events."""
reset_all = EventTerm(func=mdp.reset_scene_to_default, mode="reset", params={"reset_joint_targets": True})
randomize_cube_positions = EventTerm(
func=franka_stack_events.randomize_object_pose,
mode="reset",
params={
"pose_range": {
"x": (-0.2, 0.0),
"y": (0.20, 0.40),
"z": (0.0203, 0.0203),
"yaw": (-1.0, 1.0, 0.0),
},
"min_separation": 0.1,
"asset_cfgs": [SceneEntityCfg("cube_1"), SceneEntityCfg("cube_2"), SceneEntityCfg("cube_3")],
},
)
@configclass
class ObservationGalbotLeftArmGripperCfg:
@configclass
class PolicyCfg(ObsGroup):
"""Observations for policy group with state values."""
actions = ObsTerm(func=mdp.last_action)
joint_pos = ObsTerm(func=mdp.joint_pos_rel)
joint_vel = ObsTerm(func=mdp.joint_vel_rel)
object = ObsTerm(
func=mdp.object_abs_obs_in_base_frame,
params={
"robot_cfg": SceneEntityCfg("robot"),
},
)
cube_positions = ObsTerm(
func=mdp.cube_poses_in_base_frame, params={"robot_cfg": SceneEntityCfg("robot"), "return_key": "pos"}
)
cube_orientations = ObsTerm(
func=mdp.cube_poses_in_base_frame, params={"robot_cfg": SceneEntityCfg("robot"), "return_key": "quat"}
)
eef_pos = ObsTerm(
func=mdp.ee_frame_pose_in_base_frame,
params={
"robot_cfg": SceneEntityCfg("robot"),
"ee_frame_cfg": SceneEntityCfg("ee_frame"),
"return_key": "pos",
},
)
eef_quat = ObsTerm(
func=mdp.ee_frame_pose_in_base_frame,
params={
"robot_cfg": SceneEntityCfg("robot"),
"ee_frame_cfg": SceneEntityCfg("ee_frame"),
"return_key": "quat",
},
)
gripper_pos = ObsTerm(
func=mdp.gripper_pos,
)
def __post_init__(self):
self.enable_corruption = False
self.concatenate_terms = False
@configclass
class SubtaskCfg(ObservationsCfg.SubtaskCfg):
"""Observations for subtask group."""
grasp_1 = ObsTerm(
func=mdp.object_grasped,
params={
"robot_cfg": SceneEntityCfg("robot"),
"ee_frame_cfg": SceneEntityCfg("ee_frame"),
"object_cfg": SceneEntityCfg("cube_2"),
},
)
stack_1 = ObsTerm(
func=mdp.object_stacked,
params={
"robot_cfg": SceneEntityCfg("robot"),
"upper_object_cfg": SceneEntityCfg("cube_2"),
"lower_object_cfg": SceneEntityCfg("cube_1"),
},
)
grasp_2 = ObsTerm(
func=mdp.object_grasped,
params={
"robot_cfg": SceneEntityCfg("robot"),
"ee_frame_cfg": SceneEntityCfg("ee_frame"),
"object_cfg": SceneEntityCfg("cube_3"),
},
)
def __post_init__(self):
super().__post_init__()
@configclass
class RGBCameraPolicyCfg(ObsGroup):
"""Observations for policy group with RGB images."""
table_cam = ObsTerm(
func=mdp.image, params={"sensor_cfg": SceneEntityCfg("table_cam"), "data_type": "rgb", "normalize": False}
)
wrist_cam = ObsTerm(
func=mdp.image, params={"sensor_cfg": SceneEntityCfg("wrist_cam"), "data_type": "rgb", "normalize": False}
)
def __post_init__(self):
self.enable_corruption = False
self.concatenate_terms = False
subtask_terms: SubtaskCfg = SubtaskCfg()
policy: PolicyCfg = PolicyCfg()
rgb_camera: RGBCameraPolicyCfg = RGBCameraPolicyCfg()
@configclass
class GalbotLeftArmCubeStackEnvCfg(StackEnvCfg):
def __post_init__(self):
# post init of parent
super().__post_init__()
# MDP settings
# Set events
self.events = EventCfg()
self.observations.policy = ObservationGalbotLeftArmGripperCfg().PolicyCfg()
self.observations.subtask_terms = ObservationGalbotLeftArmGripperCfg().SubtaskCfg()
# Set galbot as robot
self.scene.robot = GALBOT_ONE_CHARLIE_CFG.replace(prim_path="{ENV_REGEX_NS}/Robot")
# Set actions for the specific robot type (galbot)
self.actions.arm_action = mdp.JointPositionActionCfg(
asset_name="robot", joint_names=["left_arm_joint.*"], scale=0.5, use_default_offset=True
)
# Enable Parallel Gripper
self.actions.gripper_action = mdp.BinaryJointPositionActionCfg(
asset_name="robot",
joint_names=["left_gripper_.*_joint"],
open_command_expr={"left_gripper_.*_joint": 0.035},
close_command_expr={"left_gripper_.*_joint": 0.0},
)
self.gripper_joint_names = ["left_gripper_.*_joint"]
self.gripper_open_val = 0.035
self.gripper_threshold = 0.010
# Rigid body properties of each cube
cube_properties = RigidBodyPropertiesCfg(
solver_position_iteration_count=16,
solver_velocity_iteration_count=1,
max_angular_velocity=1000.0,
max_linear_velocity=1000.0,
max_depenetration_velocity=5.0,
disable_gravity=False,
)
# Set each stacking cube deterministically
self.scene.cube_1 = RigidObjectCfg(
prim_path="{ENV_REGEX_NS}/Cube_1",
init_state=RigidObjectCfg.InitialStateCfg(pos=[0.4, 0.0, 0.0203], rot=[1, 0, 0, 0]),
spawn=UsdFileCfg(
usd_path=f"{ISAAC_NUCLEUS_DIR}/Props/Blocks/blue_block.usd",
scale=(1.0, 1.0, 1.0),
rigid_props=cube_properties,
),
)
self.scene.cube_2 = RigidObjectCfg(
prim_path="{ENV_REGEX_NS}/Cube_2",
init_state=RigidObjectCfg.InitialStateCfg(pos=[0.55, 0.05, 0.0203], rot=[1, 0, 0, 0]),
spawn=UsdFileCfg(
usd_path=f"{ISAAC_NUCLEUS_DIR}/Props/Blocks/red_block.usd",
scale=(1.0, 1.0, 1.0),
rigid_props=cube_properties,
),
)
self.scene.cube_3 = RigidObjectCfg(
prim_path="{ENV_REGEX_NS}/Cube_3",
init_state=RigidObjectCfg.InitialStateCfg(pos=[0.60, -0.1, 0.0203], rot=[1, 0, 0, 0]),
spawn=UsdFileCfg(
usd_path=f"{ISAAC_NUCLEUS_DIR}/Props/Blocks/green_block.usd",
scale=(1.0, 1.0, 1.0),
rigid_props=cube_properties,
),
)
# Listens to the required transforms
self.marker_cfg = FRAME_MARKER_CFG.copy()
self.marker_cfg.markers["frame"].scale = (0.1, 0.1, 0.1)
self.marker_cfg.prim_path = "/Visuals/FrameTransformer"
self.scene.ee_frame = FrameTransformerCfg(
prim_path="{ENV_REGEX_NS}/Robot/base_link",
debug_vis=False,
visualizer_cfg=self.marker_cfg,
target_frames=[
FrameTransformerCfg.FrameCfg(
prim_path="{ENV_REGEX_NS}/Robot/left_gripper_tcp_link",
name="end_effector",
offset=OffsetCfg(
pos=[0.0, 0.0, 0.0],
),
),
],
)
@configclass
class GalbotRightArmCubeStackEnvCfg(GalbotLeftArmCubeStackEnvCfg):
def __post_init__(self):
# post init of parent
super().__post_init__()
l, r = self.events.randomize_cube_positions.params["pose_range"]["y"]
self.events.randomize_cube_positions.params["pose_range"]["y"] = (
-r,
-l,
) # move to area below right hand
# Set actions for the specific robot type (galbot)
self.actions.arm_action = mdp.JointPositionActionCfg(
asset_name="robot", joint_names=["right_arm_joint.*"], scale=0.5, use_default_offset=True
)
# Set surface gripper: Ensure the SurfaceGripper prim has the required attributes
self.scene.surface_gripper = SurfaceGripperCfg(
prim_path="{ENV_REGEX_NS}/Robot/right_suction_cup_tcp_link/SurfaceGripper",
max_grip_distance=0.02,
shear_force_limit=5000.0,
coaxial_force_limit=5000.0,
retry_interval=0.05,
)
# Set surface gripper action
self.actions.gripper_action = SurfaceGripperBinaryActionCfg(
asset_name="surface_gripper",
open_command=-1.0,
close_command=1.0,
)
self.scene.ee_frame.target_frames[0].prim_path = "{ENV_REGEX_NS}/Robot/right_suction_cup_tcp_link"
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import os
import isaaclab.sim as sim_utils
from isaaclab.devices.device_base import DevicesCfg
from isaaclab.devices.keyboard import Se3KeyboardCfg
from isaaclab.devices.spacemouse import Se3SpaceMouseCfg
from isaaclab.envs.mdp.actions.rmpflow_actions_cfg import RMPFlowActionCfg
from isaaclab.sensors import CameraCfg, FrameTransformerCfg
from isaaclab.sensors.frame_transformer.frame_transformer_cfg import OffsetCfg
from isaaclab.utils import configclass
from isaaclab_tasks.manager_based.manipulation.stack import mdp
from . import stack_joint_pos_env_cfg
##
# Pre-defined configs
##
from isaaclab.controllers.config.rmp_flow import ( # isort: skip
GALBOT_LEFT_ARM_RMPFLOW_CFG,
GALBOT_RIGHT_ARM_RMPFLOW_CFG,
)
from isaaclab.markers.config import FRAME_MARKER_CFG # isort: skip
##
# RmpFlow Controller for Galbot Left Arm Cube Stack Task (with Parallel Gripper)
##
@configclass
class RmpFlowGalbotLeftArmCubeStackEnvCfg(stack_joint_pos_env_cfg.GalbotLeftArmCubeStackEnvCfg):
def __post_init__(self):
# post init of parent
super().__post_init__()
# read use_relative_mode from environment variable
# True for record_demos, and False for replay_demos, annotate_demos and generate_demos
use_relative_mode_env = os.getenv("USE_RELATIVE_MODE", "True")
self.use_relative_mode = use_relative_mode_env.lower() in ["true", "1", "t"]
# Set actions for the specific robot type (Galbot)
self.actions.arm_action = RMPFlowActionCfg(
asset_name="robot",
joint_names=["left_arm_joint.*"],
body_name="left_gripper_tcp_link",
controller=GALBOT_LEFT_ARM_RMPFLOW_CFG,
scale=1.0,
body_offset=RMPFlowActionCfg.OffsetCfg(pos=[0.0, 0.0, 0.0]),
articulation_prim_expr="/World/envs/env_.*/Robot",
use_relative_mode=self.use_relative_mode,
)
# Set the simulation parameters
self.sim.dt = 1 / 60
self.sim.render_interval = 6
self.decimation = 3
self.episode_length_s = 30.0
self.teleop_devices = DevicesCfg(
devices={
"keyboard": Se3KeyboardCfg(
pos_sensitivity=0.05,
rot_sensitivity=0.05,
sim_device=self.sim.device,
),
"spacemouse": Se3SpaceMouseCfg(
pos_sensitivity=0.05,
rot_sensitivity=0.05,
sim_device=self.sim.device,
),
}
)
##
# RmpFlow Controller for Galbot Right Arm Cube Stack Task (with Surface Gripper)
##
@configclass
class RmpFlowGalbotRightArmCubeStackEnvCfg(stack_joint_pos_env_cfg.GalbotRightArmCubeStackEnvCfg):
def __post_init__(self):
# post init of parent
super().__post_init__()
# read use_relative_mode from environment variable
# True for record_demos, and False for replay_demos, annotate_demos and generate_demos
use_relative_mode_env = os.getenv("USE_RELATIVE_MODE", "True")
self.use_relative_mode = use_relative_mode_env.lower() in ["true", "1", "t"]
# Set actions for the specific robot type (Galbot)
self.actions.arm_action = RMPFlowActionCfg(
asset_name="robot",
joint_names=["right_arm_joint.*"],
body_name="right_suction_cup_tcp_link",
controller=GALBOT_RIGHT_ARM_RMPFLOW_CFG,
scale=1.0,
body_offset=RMPFlowActionCfg.OffsetCfg(pos=[0.0, 0.0, 0.0]),
articulation_prim_expr="/World/envs/env_.*/Robot",
use_relative_mode=self.use_relative_mode,
)
# Set the simulation parameters
self.sim.dt = 1 / 60
self.sim.render_interval = 1
self.decimation = 3
self.episode_length_s = 30.0
self.teleop_devices = DevicesCfg(
devices={
"keyboard": Se3KeyboardCfg(
pos_sensitivity=0.05,
rot_sensitivity=0.05,
sim_device=self.sim.device,
),
"spacemouse": Se3SpaceMouseCfg(
pos_sensitivity=0.05,
rot_sensitivity=0.05,
sim_device=self.sim.device,
),
}
)
##
# Visuomotor Env for Record, Generate and Replay (in Task Space)
##
@configclass
class RmpFlowGalbotLeftArmCubeStackVisuomotorEnvCfg(RmpFlowGalbotLeftArmCubeStackEnvCfg):
def __post_init__(self):
# post init of parent
super().__post_init__()
# Set left and right wrist cameras for VLA policy training
self.scene.right_wrist_cam = CameraCfg(
prim_path="{ENV_REGEX_NS}/Robot/right_arm_camera_sim_view_frame/right_camera",
update_period=0.0333,
height=256,
width=256,
data_types=["rgb", "distance_to_image_plane"],
spawn=sim_utils.PinholeCameraCfg(
focal_length=18.0, focus_distance=400.0, horizontal_aperture=20.955, clipping_range=(0.1, 1.0e5)
),
offset=CameraCfg.OffsetCfg(pos=(0.0, 0.0, 0.0), rot=(0.5, -0.5, 0.5, -0.5), convention="ros"),
)
self.scene.left_wrist_cam = CameraCfg(
prim_path="{ENV_REGEX_NS}/Robot/left_arm_camera_sim_view_frame/left_camera",
update_period=0.0333,
height=256,
width=256,
data_types=["rgb", "distance_to_image_plane"],
spawn=sim_utils.PinholeCameraCfg(
focal_length=18.0, focus_distance=400.0, horizontal_aperture=20.955, clipping_range=(0.1, 1.0e5)
),
offset=CameraCfg.OffsetCfg(pos=(0.0, 0.0, 0.0), rot=(0.5, -0.5, 0.5, -0.5), convention="ros"),
)
# Set ego view camera
self.scene.ego_cam = CameraCfg(
prim_path="{ENV_REGEX_NS}/Robot/head_camera_sim_view_frame/head_camera",
update_period=0.0333,
height=256,
width=256,
data_types=["rgb", "distance_to_image_plane"],
spawn=sim_utils.PinholeCameraCfg(
focal_length=18.0, focus_distance=400.0, horizontal_aperture=20.955, clipping_range=(0.1, 1.0e5)
),
offset=CameraCfg.OffsetCfg(pos=(0.0, 0.0, 0.0), rot=(0.5, -0.5, 0.5, -0.5), convention="ros"),
)
# Set front view camera
self.scene.front_cam = CameraCfg(
prim_path="{ENV_REGEX_NS}/front_camera",
update_period=0.0333,
height=256,
width=256,
data_types=["rgb", "distance_to_image_plane"],
spawn=sim_utils.PinholeCameraCfg(
focal_length=24.0, focus_distance=400.0, horizontal_aperture=20.955, clipping_range=(0.1, 1.0e5)
),
offset=CameraCfg.OffsetCfg(pos=(1.0, 0.0, 0.6), rot=(-0.3799, 0.5963, 0.5963, -0.3799), convention="ros"),
)
marker_right_camera_cfg = FRAME_MARKER_CFG.copy()
marker_right_camera_cfg.markers["frame"].scale = (0.1, 0.1, 0.1)
marker_right_camera_cfg.prim_path = "/Visuals/FrameTransformerRightCamera"
self.scene.right_arm_camera_frame = FrameTransformerCfg(
prim_path="{ENV_REGEX_NS}/Robot/base_link",
debug_vis=False,
visualizer_cfg=marker_right_camera_cfg,
target_frames=[
FrameTransformerCfg.FrameCfg(
prim_path="{ENV_REGEX_NS}/Robot/right_arm_camera_sim_view_frame",
name="right_camera",
offset=OffsetCfg(
pos=[0.0, 0.0, 0.0],
rot=(0.5, -0.5, 0.5, -0.5),
),
),
],
)
marker_left_camera_cfg = FRAME_MARKER_CFG.copy()
marker_left_camera_cfg.markers["frame"].scale = (0.1, 0.1, 0.1)
marker_left_camera_cfg.prim_path = "/Visuals/FrameTransformerLeftCamera"
self.scene.left_arm_camera_frame = FrameTransformerCfg(
prim_path="{ENV_REGEX_NS}/Robot/base_link",
debug_vis=False,
visualizer_cfg=marker_left_camera_cfg,
target_frames=[
FrameTransformerCfg.FrameCfg(
prim_path="{ENV_REGEX_NS}/Robot/left_arm_camera_sim_view_frame",
name="left_camera",
offset=OffsetCfg(
pos=[0.0, 0.0, 0.0],
rot=(0.5, -0.5, 0.5, -0.5),
),
),
],
)
# Set settings for camera rendering
self.rerender_on_reset = True
self.sim.render.antialiasing_mode = "OFF" # disable dlss
# List of image observations in policy observations
self.image_obs_list = ["ego_cam", "left_wrist_cam", "right_wrist_cam"]
##
# Task Env for VLA Policy Close-loop Evaluation (in Joint Space)
##
@configclass
class GalbotLeftArmJointPositionCubeStackVisuomotorEnvCfg_PLAY(RmpFlowGalbotLeftArmCubeStackVisuomotorEnvCfg):
def __post_init__(self):
# post init of parent
super().__post_init__()
self.actions.arm_action = mdp.JointPositionActionCfg(
asset_name="robot", joint_names=["left_arm_joint.*"], scale=1.0, use_default_offset=False
)
# Enable Parallel Gripper with AbsBinaryJointPosition Control
self.actions.gripper_action = mdp.AbsBinaryJointPositionActionCfg(
asset_name="robot",
threshold=0.030,
joint_names=["left_gripper_.*_joint"],
open_command_expr={"left_gripper_.*_joint": 0.035},
close_command_expr={"left_gripper_.*_joint": 0.023},
# real gripper close data is 0.0235, close to it to meet data distribution, but smaller to ensure robust grasping.
# during VLA inference, we set the close command to '0.023' since the VLA has never seen the gripper fully closed.
)
##
# Task Envs for VLA Policy Close-loop Evaluation (in Task Space)
##
@configclass
class GalbotLeftArmRmpFlowCubeStackVisuomotorEnvCfg_PLAY(RmpFlowGalbotLeftArmCubeStackVisuomotorEnvCfg):
def __post_init__(self):
# post init of parent
super().__post_init__()
# Enable Parallel Gripper with AbsBinaryJointPosition Control
self.actions.gripper_action = mdp.AbsBinaryJointPositionActionCfg(
asset_name="robot",
threshold=0.030,
joint_names=["left_gripper_.*_joint"],
open_command_expr={"left_gripper_.*_joint": 0.035},
close_command_expr={"left_gripper_.*_joint": 0.023},
)
...@@ -6,8 +6,9 @@ ...@@ -6,8 +6,9 @@
from __future__ import annotations from __future__ import annotations
import torch import torch
from typing import TYPE_CHECKING from typing import TYPE_CHECKING, Literal
import isaaclab.utils.math as math_utils
from isaaclab.assets import Articulation, RigidObject, RigidObjectCollection from isaaclab.assets import Articulation, RigidObject, RigidObjectCollection
from isaaclab.managers import SceneEntityCfg from isaaclab.managers import SceneEntityCfg
from isaaclab.sensors import FrameTransformer from isaaclab.sensors import FrameTransformer
...@@ -256,12 +257,35 @@ def ee_frame_quat(env: ManagerBasedRLEnv, ee_frame_cfg: SceneEntityCfg = SceneEn ...@@ -256,12 +257,35 @@ def ee_frame_quat(env: ManagerBasedRLEnv, ee_frame_cfg: SceneEntityCfg = SceneEn
return ee_frame_quat return ee_frame_quat
def gripper_pos(env: ManagerBasedRLEnv, robot_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor: def gripper_pos(
env: ManagerBasedRLEnv,
robot_cfg: SceneEntityCfg = SceneEntityCfg("robot"),
) -> torch.Tensor:
"""
Obtain the versatile gripper position of both Gripper and Suction Cup.
"""
robot: Articulation = env.scene[robot_cfg.name] robot: Articulation = env.scene[robot_cfg.name]
finger_joint_1 = robot.data.joint_pos[:, -1].clone().unsqueeze(1)
finger_joint_2 = -1 * robot.data.joint_pos[:, -2].clone().unsqueeze(1)
return torch.cat((finger_joint_1, finger_joint_2), dim=1) if hasattr(env.scene, "surface_grippers") and len(env.scene.surface_grippers) > 0:
# Handle multiple surface grippers by concatenating their states
gripper_states = []
for gripper_name, surface_gripper in env.scene.surface_grippers.items():
gripper_states.append(surface_gripper.state.view(-1, 1))
if len(gripper_states) == 1:
return gripper_states[0]
else:
return torch.cat(gripper_states, dim=1)
else:
if hasattr(env.cfg, "gripper_joint_names"):
gripper_joint_ids, _ = robot.find_joints(env.cfg.gripper_joint_names)
assert len(gripper_joint_ids) == 2, "Observation gripper_pos only support parallel gripper for now"
finger_joint_1 = robot.data.joint_pos[:, gripper_joint_ids[0]].clone().unsqueeze(1)
finger_joint_2 = -1 * robot.data.joint_pos[:, gripper_joint_ids[1]].clone().unsqueeze(1)
return torch.cat((finger_joint_1, finger_joint_2), dim=1)
else:
raise NotImplementedError("[Error] Cannot find gripper_joint_names in the environment config")
def object_grasped( def object_grasped(
...@@ -270,8 +294,6 @@ def object_grasped( ...@@ -270,8 +294,6 @@ def object_grasped(
ee_frame_cfg: SceneEntityCfg, ee_frame_cfg: SceneEntityCfg,
object_cfg: SceneEntityCfg, object_cfg: SceneEntityCfg,
diff_threshold: float = 0.06, diff_threshold: float = 0.06,
gripper_open_val: torch.tensor = torch.tensor([0.04]),
gripper_threshold: float = 0.005,
) -> torch.Tensor: ) -> torch.Tensor:
"""Check if an object is grasped by the specified robot.""" """Check if an object is grasped by the specified robot."""
...@@ -283,13 +305,33 @@ def object_grasped( ...@@ -283,13 +305,33 @@ def object_grasped(
end_effector_pos = ee_frame.data.target_pos_w[:, 0, :] end_effector_pos = ee_frame.data.target_pos_w[:, 0, :]
pose_diff = torch.linalg.vector_norm(object_pos - end_effector_pos, dim=1) pose_diff = torch.linalg.vector_norm(object_pos - end_effector_pos, dim=1)
grasped = torch.logical_and( if hasattr(env.scene, "surface_grippers") and len(env.scene.surface_grippers) > 0:
pose_diff < diff_threshold, surface_gripper = env.scene.surface_grippers["surface_gripper"]
torch.abs(robot.data.joint_pos[:, -1] - gripper_open_val.to(env.device)) > gripper_threshold, suction_cup_status = surface_gripper.state.view(-1, 1) # 1: closed, 0: closing, -1: open
) suction_cup_is_closed = (suction_cup_status == 1).to(torch.float32)
grasped = torch.logical_and( grasped = torch.logical_and(suction_cup_is_closed, pose_diff < diff_threshold)
grasped, torch.abs(robot.data.joint_pos[:, -2] - gripper_open_val.to(env.device)) > gripper_threshold
) else:
if hasattr(env.cfg, "gripper_joint_names"):
gripper_joint_ids, _ = robot.find_joints(env.cfg.gripper_joint_names)
assert len(gripper_joint_ids) == 2, "Observations only support parallel gripper for now"
grasped = torch.logical_and(
pose_diff < diff_threshold,
torch.abs(
robot.data.joint_pos[:, gripper_joint_ids[0]]
- torch.tensor(env.cfg.gripper_open_val, dtype=torch.float32).to(env.device)
)
> env.cfg.gripper_threshold,
)
grasped = torch.logical_and(
grasped,
torch.abs(
robot.data.joint_pos[:, gripper_joint_ids[1]]
- torch.tensor(env.cfg.gripper_open_val, dtype=torch.float32).to(env.device)
)
> env.cfg.gripper_threshold,
)
return grasped return grasped
...@@ -302,7 +344,6 @@ def object_stacked( ...@@ -302,7 +344,6 @@ def object_stacked(
xy_threshold: float = 0.05, xy_threshold: float = 0.05,
height_threshold: float = 0.005, height_threshold: float = 0.005,
height_diff: float = 0.0468, height_diff: float = 0.0468,
gripper_open_val: torch.tensor = torch.tensor([0.04]),
) -> torch.Tensor: ) -> torch.Tensor:
"""Check if an object is stacked by the specified robot.""" """Check if an object is stacked by the specified robot."""
...@@ -316,11 +357,176 @@ def object_stacked( ...@@ -316,11 +357,176 @@ def object_stacked(
stacked = torch.logical_and(xy_dist < xy_threshold, (height_dist - height_diff) < height_threshold) stacked = torch.logical_and(xy_dist < xy_threshold, (height_dist - height_diff) < height_threshold)
stacked = torch.logical_and( if hasattr(env.scene, "surface_grippers") and len(env.scene.surface_grippers) > 0:
torch.isclose(robot.data.joint_pos[:, -1], gripper_open_val.to(env.device), atol=1e-4, rtol=1e-4), stacked surface_gripper = env.scene.surface_grippers["surface_gripper"]
suction_cup_status = surface_gripper.state.view(-1, 1) # 1: closed, 0: closing, -1: open
suction_cup_is_open = (suction_cup_status == -1).to(torch.float32)
stacked = torch.logical_and(suction_cup_is_open, stacked)
else:
if hasattr(env.cfg, "gripper_joint_names"):
gripper_joint_ids, _ = robot.find_joints(env.cfg.gripper_joint_names)
assert len(gripper_joint_ids) == 2, "Observations only support parallel gripper for now"
stacked = torch.logical_and(
torch.isclose(
robot.data.joint_pos[:, gripper_joint_ids[0]],
torch.tensor(env.cfg.gripper_open_val, dtype=torch.float32).to(env.device),
atol=1e-4,
rtol=1e-4,
),
stacked,
)
stacked = torch.logical_and(
torch.isclose(
robot.data.joint_pos[:, gripper_joint_ids[1]],
torch.tensor(env.cfg.gripper_open_val, dtype=torch.float32).to(env.device),
atol=1e-4,
rtol=1e-4,
),
stacked,
)
else:
raise ValueError("No gripper_joint_names found in environment config")
return stacked
def cube_poses_in_base_frame(
env: ManagerBasedRLEnv,
cube_1_cfg: SceneEntityCfg = SceneEntityCfg("cube_1"),
cube_2_cfg: SceneEntityCfg = SceneEntityCfg("cube_2"),
cube_3_cfg: SceneEntityCfg = SceneEntityCfg("cube_3"),
robot_cfg: SceneEntityCfg = SceneEntityCfg("robot"),
return_key: Literal["pos", "quat", None] = None,
) -> torch.Tensor:
"""The position and orientation of the cubes in the robot base frame."""
cube_1: RigidObject = env.scene[cube_1_cfg.name]
cube_2: RigidObject = env.scene[cube_2_cfg.name]
cube_3: RigidObject = env.scene[cube_3_cfg.name]
pos_cube_1_world = cube_1.data.root_pos_w
pos_cube_2_world = cube_2.data.root_pos_w
pos_cube_3_world = cube_3.data.root_pos_w
quat_cube_1_world = cube_1.data.root_quat_w
quat_cube_2_world = cube_2.data.root_quat_w
quat_cube_3_world = cube_3.data.root_quat_w
robot: Articulation = env.scene[robot_cfg.name]
root_pos_w = robot.data.root_pos_w
root_quat_w = robot.data.root_quat_w
pos_cube_1_base, quat_cube_1_base = math_utils.subtract_frame_transforms(
root_pos_w, root_quat_w, pos_cube_1_world, quat_cube_1_world
)
pos_cube_2_base, quat_cube_2_base = math_utils.subtract_frame_transforms(
root_pos_w, root_quat_w, pos_cube_2_world, quat_cube_2_world
) )
stacked = torch.logical_and( pos_cube_3_base, quat_cube_3_base = math_utils.subtract_frame_transforms(
torch.isclose(robot.data.joint_pos[:, -2], gripper_open_val.to(env.device), atol=1e-4, rtol=1e-4), stacked root_pos_w, root_quat_w, pos_cube_3_world, quat_cube_3_world
) )
return stacked pos_cubes_base = torch.cat((pos_cube_1_base, pos_cube_2_base, pos_cube_3_base), dim=1)
quat_cubes_base = torch.cat((quat_cube_1_base, quat_cube_2_base, quat_cube_3_base), dim=1)
if return_key == "pos":
return pos_cubes_base
elif return_key == "quat":
return quat_cubes_base
elif return_key is None:
return torch.cat((pos_cubes_base, quat_cubes_base), dim=1)
def object_abs_obs_in_base_frame(
env: ManagerBasedRLEnv,
cube_1_cfg: SceneEntityCfg = SceneEntityCfg("cube_1"),
cube_2_cfg: SceneEntityCfg = SceneEntityCfg("cube_2"),
cube_3_cfg: SceneEntityCfg = SceneEntityCfg("cube_3"),
ee_frame_cfg: SceneEntityCfg = SceneEntityCfg("ee_frame"),
robot_cfg: SceneEntityCfg = SceneEntityCfg("robot"),
):
"""
Object Abs observations (in base frame): remove the relative observations, and add abs gripper pos and quat in robot base frame
cube_1 pos,
cube_1 quat,
cube_2 pos,
cube_2 quat,
cube_3 pos,
cube_3 quat,
gripper pos,
gripper quat,
"""
cube_1: RigidObject = env.scene[cube_1_cfg.name]
cube_2: RigidObject = env.scene[cube_2_cfg.name]
cube_3: RigidObject = env.scene[cube_3_cfg.name]
ee_frame: FrameTransformer = env.scene[ee_frame_cfg.name]
robot: Articulation = env.scene[robot_cfg.name]
root_pos_w = robot.data.root_pos_w
root_quat_w = robot.data.root_quat_w
cube_1_pos_w = cube_1.data.root_pos_w
cube_1_quat_w = cube_1.data.root_quat_w
cube_2_pos_w = cube_2.data.root_pos_w
cube_2_quat_w = cube_2.data.root_quat_w
cube_3_pos_w = cube_3.data.root_pos_w
cube_3_quat_w = cube_3.data.root_quat_w
pos_cube_1_base, quat_cube_1_base = math_utils.subtract_frame_transforms(
root_pos_w, root_quat_w, cube_1_pos_w, cube_1_quat_w
)
pos_cube_2_base, quat_cube_2_base = math_utils.subtract_frame_transforms(
root_pos_w, root_quat_w, cube_2_pos_w, cube_2_quat_w
)
pos_cube_3_base, quat_cube_3_base = math_utils.subtract_frame_transforms(
root_pos_w, root_quat_w, cube_3_pos_w, cube_3_quat_w
)
ee_pos_w = ee_frame.data.target_pos_w[:, 0, :]
ee_quat_w = ee_frame.data.target_quat_w[:, 0, :]
ee_pos_base, ee_quat_base = math_utils.subtract_frame_transforms(root_pos_w, root_quat_w, ee_pos_w, ee_quat_w)
return torch.cat(
(
pos_cube_1_base,
quat_cube_1_base,
pos_cube_2_base,
quat_cube_2_base,
pos_cube_3_base,
quat_cube_3_base,
ee_pos_base,
ee_quat_base,
),
dim=1,
)
def ee_frame_pose_in_base_frame(
env: ManagerBasedRLEnv,
ee_frame_cfg: SceneEntityCfg = SceneEntityCfg("ee_frame"),
robot_cfg: SceneEntityCfg = SceneEntityCfg("robot"),
return_key: Literal["pos", "quat", None] = None,
) -> torch.Tensor:
"""
The end effector pose in the robot base frame.
"""
ee_frame: FrameTransformer = env.scene[ee_frame_cfg.name]
ee_frame_pos_w = ee_frame.data.target_pos_w[:, 0, :]
ee_frame_quat_w = ee_frame.data.target_quat_w[:, 0, :]
robot: Articulation = env.scene[robot_cfg.name]
root_pos_w = robot.data.root_pos_w
root_quat_w = robot.data.root_quat_w
ee_pos_in_base, ee_quat_in_base = math_utils.subtract_frame_transforms(
root_pos_w, root_quat_w, ee_frame_pos_w, ee_frame_quat_w
)
if return_key == "pos":
return ee_pos_in_base
elif return_key == "quat":
return ee_quat_in_base
elif return_key is None:
return torch.cat((ee_pos_in_base, ee_quat_in_base), dim=1)
...@@ -30,7 +30,6 @@ def cubes_stacked( ...@@ -30,7 +30,6 @@ def cubes_stacked(
xy_threshold: float = 0.04, xy_threshold: float = 0.04,
height_threshold: float = 0.005, height_threshold: float = 0.005,
height_diff: float = 0.0468, height_diff: float = 0.0468,
gripper_open_val: torch.tensor = torch.tensor([0.04]),
atol=0.0001, atol=0.0001,
rtol=0.0001, rtol=0.0001,
): ):
...@@ -58,11 +57,36 @@ def cubes_stacked( ...@@ -58,11 +57,36 @@ def cubes_stacked(
stacked = torch.logical_and(pos_diff_c23[:, 2] < 0.0, stacked) stacked = torch.logical_and(pos_diff_c23[:, 2] < 0.0, stacked)
# Check gripper positions # Check gripper positions
stacked = torch.logical_and( if hasattr(env.scene, "surface_grippers") and len(env.scene.surface_grippers) > 0:
torch.isclose(robot.data.joint_pos[:, -1], gripper_open_val.to(env.device), atol=atol, rtol=rtol), stacked surface_gripper = env.scene.surface_grippers["surface_gripper"]
) suction_cup_status = surface_gripper.state.view(-1, 1) # 1: closed, 0: closing, -1: open
stacked = torch.logical_and( suction_cup_is_open = (suction_cup_status == -1).to(torch.float32)
torch.isclose(robot.data.joint_pos[:, -2], gripper_open_val.to(env.device), atol=atol, rtol=rtol), stacked stacked = torch.logical_and(suction_cup_is_open, stacked)
)
else:
if hasattr(env.cfg, "gripper_joint_names"):
gripper_joint_ids, _ = robot.find_joints(env.cfg.gripper_joint_names)
assert len(gripper_joint_ids) == 2, "Terminations only support parallel gripper for now"
stacked = torch.logical_and(
torch.isclose(
robot.data.joint_pos[:, gripper_joint_ids[0]],
torch.tensor(env.cfg.gripper_open_val, dtype=torch.float32).to(env.device),
atol=atol,
rtol=rtol,
),
stacked,
)
stacked = torch.logical_and(
torch.isclose(
robot.data.joint_pos[:, gripper_joint_ids[1]],
torch.tensor(env.cfg.gripper_open_val, dtype=torch.float32).to(env.device),
atol=atol,
rtol=rtol,
),
stacked,
)
else:
raise ValueError("No gripper_joint_names found in environment config")
return stacked return stacked
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment