Commit 5cf3d618 authored by Ashwin Varghese Kuruttukulam's avatar Ashwin Varghese Kuruttukulam Committed by Kelly Guo

Adds Isaac Lab Mimic based on MimicGen data generation for Imitation Learning (#179)

Initial version of Isaac Lab Mimic, based on the MimicGen paper and
code: https://mimicgen.github.io/.
The MimicGen workflow is useful for generating additional trajectories
for Imitation Learning, allowing training from a minimal set of
human-generated trajectories.

- New feature (non-breaking change which adds functionality)
- This change requires a documentation update

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./isaaclab.sh --format`
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [x] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there

---------
Signed-off-by: 's avatarAshwin Varghese Kuruttukulam <123109010+ashwinvkNV@users.noreply.github.com>
Signed-off-by: 's avatarKelly Guo <kellyguo123@hotmail.com>
Co-authored-by: 's avatarKelly Guo <kellyg@nvidia.com>
Co-authored-by: 's avatarPeter Du <peterd@nvidia.com>
Co-authored-by: 's avatarKarsten Patzwaldt <kpatzwaldt@nvidia.com>
Co-authored-by: 's avatarCY Chen <cyc@nvidia.com>
Co-authored-by: 's avatarKelly Guo <kellyguo123@hotmail.com>
parent e899e090
Copyright (c) 2022-2024, The Isaac Lab Project Developers.
All rights reserved.
SPDX-License-Identifier: Apache-2.0
......@@ -61,6 +61,18 @@ repos:
- --license-filepath
- .github/LICENSE_HEADER.txt
- --use-current-year
exclude: "source/extensions/omni.isaac.lab_mimic/|source/standalone/workflows/isaac_lab_mimic/"
# Apache 2.0 license for mimic files
- repo: https://github.com/Lucas-C/pre-commit-hooks
rev: v1.5.1
hooks:
- id: insert-license
files: ^(source/extensions/omni.isaac.lab_mimic|source/standalone/workflows/isaac_lab_mimic)/.*\.py$
args:
# - --remove-header # Remove existing license headers. Useful when updating license.
- --license-filepath
- .github/LICENSE_HEADER_MIMIC.txt
- --use-current-year
- repo: https://github.com/pre-commit/pygrep-hooks
rev: v1.10.0
hooks:
......
......@@ -39,6 +39,7 @@ Guidelines for modifications:
* Andrej Orsula
* Anton Bjørndahl Mortensen
* Arjun Bhardwaj
* Ashwin Varghese Kuruttukulam
* Brayden Zhang
* Cameron Upright
* Calvin Yu
......
This diff is collapsed.
......@@ -11,6 +11,7 @@
[![pre-commit](https://img.shields.io/github/actions/workflow/status/isaac-sim/IsaacLab/pre-commit.yaml?logo=pre-commit&logoColor=white&label=pre-commit&color=brightgreen)](https://github.com/isaac-sim/IsaacLab/actions/workflows/pre-commit.yaml)
[![docs status](https://img.shields.io/github/actions/workflow/status/isaac-sim/IsaacLab/docs.yaml?label=docs&color=brightgreen)](https://github.com/isaac-sim/IsaacLab/actions/workflows/docs.yaml)
[![License](https://img.shields.io/badge/license-BSD--3-yellow.svg)](https://opensource.org/licenses/BSD-3-Clause)
[![License](https://img.shields.io/badge/license-Apache--2.0-yellow.svg)](https://opensource.org/license/apache-2-0)
**Isaac Lab** is a GPU-accelerated, open-source framework designed to unify and simplify robotics research workflows, such as reinforcement learning, imitation learning, and motion planning. Built on [NVIDIA Isaac Sim](https://docs.omniverse.nvidia.com/isaacsim/latest/overview.html), it combines fast and accurate physics and sensor simulation, making it an ideal choice for sim-to-real transfer in robotics.
......@@ -57,7 +58,7 @@ or opening a question on its [forums](https://forums.developer.nvidia.com/c/agx-
## License
The Isaac Lab framework is released under [BSD-3 License](LICENSE). The license files of its dependencies and assets are present in the [`docs/licenses`](docs/licenses) directory.
The Isaac Lab framework is released under [BSD-3 License](LICENSE). The `omni.isaac.lab_mimic` extension and its corresponding standalone scripts are released under [Apache 2.0](LICENSE-mimic). The license files of its dependencies and assets are present in the [`docs/licenses`](docs/licenses) directory.
## Acknowledgement
......
......@@ -16,13 +16,29 @@ To play inverse kinematics (IK) control with a keyboard device:
./isaaclab.sh -p source/standalone/environments/teleoperation/teleop_se3_agent.py --task Isaac-Lift-Cube-Franka-IK-Rel-v0 --num_envs 1 --teleop_device keyboard
For smoother operation and off-axis operation, we recommend using a SpaceMouse as input device. Providing smoother demonstration will make it easier for the policy to clone the behavior. To use a SpaceMouse, simply change the teleop device accordingly:
.. code:: bash
./isaaclab.sh -p source/standalone/environments/teleoperation/teleop_se3_agent.py --task Isaac-Lift-Cube-Franka-IK-Rel-v0 --num_envs 1 --teleop_device spacemouse
.. note::
If the SpaceMouse is not detected, you may need to grant additional user permissions by running ``sudo chmod 666 /dev/hidraw<#>`` where ``<#>`` corresponds to the device index
of the connected SpaceMouse.
To determine the device index, list all ``hidraw`` devices by running ``ls -l /dev/hidraw*``.
Identify the device corresponding to the SpaceMouse by running ``cat /sys/class/hidraw/hidraw<#>/device/uevent`` on each of the devices listed
from the prior step.
The script prints the teleoperation events configured. For keyboard,
these are as follows:
.. code:: text
Keyboard Controller for SE(3): Se3Keyboard
Reset all commands: L
Reset all commands: R
Toggle gripper (open/close): K
Move arm along x-axis: W/S
Move arm along y-axis: A/D
......@@ -35,23 +51,72 @@ Imitation Learning
~~~~~~~~~~~~~~~~~~
Using the teleoperation devices, it is also possible to collect data for
learning from demonstrations (LfD). For this, we support the learning
framework `Robomimic <https://robomimic.github.io/>`__ (Linux only) and allow saving
data in
`HDF5 <https://robomimic.github.io/docs/tutorials/dataset_contents.html#viewing-hdf5-dataset-structure>`__
format.
learning from demonstrations (LfD). For this, we provide scripts to collect data into the open HDF5 format.
.. note::
This tutorial assumes you have a ``datasets`` directory under the ``IsaacLab`` repo. Create this directory by running ``cd IsaacLab`` and ``mkdir datasets``.
1. Collect demonstrations with teleoperation for the environment
``Isaac-Lift-Cube-Franka-IK-Rel-v0``:
``Isaac-Stack-Cube-Franka-IK-Rel-v0``:
.. code:: bash
# step a: collect data with keyboard
./isaaclab.sh -p source/standalone/workflows/robomimic/collect_demonstrations.py --task Isaac-Lift-Cube-Franka-IK-Rel-v0 --num_envs 1 --num_demos 10 --teleop_device keyboard
# step b: inspect the collected dataset
./isaaclab.sh -p source/standalone/workflows/robomimic/tools/inspect_demonstrations.py logs/robomimic/Isaac-Lift-Cube-Franka-IK-Rel-v0/hdf_dataset.hdf5
# step a: collect data with spacemouse
./isaaclab.sh -p source/standalone/tools/record_demos.py --task Isaac-Stack-Cube-Franka-IK-Rel-v0 --teleop_device spacemouse --dataset_file ./datasets/dataset.hdf5 --num_demos 10
# step b: replay the collected dataset
./isaaclab.sh -p source/standalone/tools/replay_demos.py --task Isaac-Stack-Cube-Franka-IK-Rel-v0 --dataset_file ./datasets/dataset.hdf5
.. note::
The order of the stacked cubes should be blue (bottom), red (middle), green (top).
About 10 successful demonstrations are required in order for the following steps to succeed.
Here are some tips to perform demonstrations that lead to successful policy training:
* Keep demonstrations short. Shorter demonstrations mean fewer decisions for the policy, making training easier.
* Take a direct path. Do not follow along arbitrary axis, but move straight toward the goal.
* Do not pause. Perform smooth, continuous motions instead. It is not obvious for a policy why and when to pause, hence continuous motions are easier to learn.
If, while performing a demonstration, a mistake is made, or the current demonstration should not be recorded for some other reason, press the ``R`` key to discard the current demonstration, and reset to a new starting position.
2. Generate additional demonstrations using Isaac Lab Mimic
Isaac Lab Mimic is a feature in Isaac Lab that allows to generate additional demonstrations automatically, allowing a policy to learn successfully even from just a handful of manual demonstrations.
2. Split the dataset into train and validation set:
In order to use Isaac Lab Mimic with the recorded dataset, first annotate the subtasks in the recording:
.. code:: bash
./isaaclab.sh -p source/standalone/workflows/isaac_lab_mimic/annotate_demos.py --input_file ./datasets/dataset.hdf5 --output_file ./datasets/annotated_dataset.hdf5 --task Isaac-Stack-Cube-Franka-IK-Rel-Mimic-v0 --auto
Then, use Isaac Lab Mimic to generate some additional demonstrations:
.. code:: bash
./isaaclab.sh -p source/standalone/workflows/isaac_lab_mimic/generate_dataset.py --input_file ./datasets/annotated_dataset.hdf5 --output_file ./datasets/generated_dataset_small.hdf5 --num_envs 10 --generation_num_trials 10
.. note::
The output_file of the ``annotate_demos.py`` script is the input_file to the ``generate_dataset.py`` script
.. note::
Isaac Lab is designed to work with manipulators with grippers. The gripper commands in the demonstrations are extracted separately and temporally replayed during the generation of additional demonstrations.
Inspect the output of generated data (filename: ``generated_dataset_small.hdf5``), and if satisfactory, generate the full dataset:
./isaaclab.sh -p source/standalone/workflows/isaac_lab_mimic/generate_dataset.py --input_file ./datasets/annotated_dataset.hdf5 --output_file ./datasets/generated_dataset.hdf5 --num_envs 10 --generation_num_trials 1000 --headless
The number of demonstrations can be increased or decreased, 1000 demonstrations have been shown to provide good training results for this task.
Additionally, the number of environments in the ``--num_envs`` parameter can be adjusted to speed up data generation. The suggested number of 10 can be executed even on a laptop GPU. On a more powerful desktop machine, set it to 100 or higher for significant speedup of this step.
3. Setup robomimic for training a policy
As an example, we will train a BC agent implemented in `Robomimic <https://robomimic.github.io/>`__ to train a policy. Any other framework or training method could be used.
.. code:: bash
......@@ -59,18 +124,17 @@ format.
sudo apt install cmake build-essential
# install python module (for robomimic)
./isaaclab.sh -i robomimic
# split data
./isaaclab.sh -p source/standalone/workflows/robomimic/tools/split_train_val.py logs/robomimic/Isaac-Lift-Cube-Franka-IK-Rel-v0/hdf_dataset.hdf5 --ratio 0.2
3. Train a BC agent for ``Isaac-Lift-Cube-Franka-IK-Rel-v0`` with
`Robomimic <https://robomimic.github.io/>`__:
4. Train a BC agent for ``Isaac-Stack-Cube-Franka-IK-Rel-v0`` using the Mimic generated data:
.. code:: bash
./isaaclab.sh -p source/standalone/workflows/robomimic/train.py --task Isaac-Lift-Cube-Franka-IK-Rel-v0 --algo bc --dataset logs/robomimic/Isaac-Lift-Cube-Franka-IK-Rel-v0/hdf_dataset.hdf5
./isaaclab.sh -p source/standalone/workflows/robomimic/train.py --task Isaac-Stack-Cube-Franka-IK-Rel-v0 --algo bc --dataset ./datasets/generated_dataset.hdf5
By default, the training script will save a model checkpoint every 100 epochs. The trained models and logs will be saved to logs/robomimic/Isaac-Stack-Cube-Franka-IK-Rel-v0/bc
4. Play the learned model to visualize results:
5. Play the learned model to visualize results:
.. code:: bash
./isaaclab.sh -p source/standalone/workflows/robomimic/play.py --task Isaac-Lift-Cube-Franka-IK-Rel-v0 --checkpoint /PATH/TO/model.pth
./isaaclab.sh -p source/standalone/workflows/robomimic/play.py --task Isaac-Stack-Cube-Franka-IK-Rel-v0 --checkpoint /PATH/TO/desired_model_checkpoint.pth
[package]
# Note: Semantic Versioning is used: https://semver.org/
version = "0.31.7"
version = "0.32.7"
# Description
title = "Isaac Lab framework for Robot Learning"
......
Changelog
---------
0.31.7 (2025-01-30)
0.32.7 (2025-01-30)
~~~~~~~~~~~~~~~~~~~
Fixed
......@@ -12,7 +12,7 @@ Fixed
to the event being triggered at the wrong time after the reset.
0.31.6 (2025-01-17)
0.32.6 (2025-01-17)
~~~~~~~~~~~~~~~~~~~
Fixed
......@@ -42,7 +42,7 @@ Fixed
the :class:`omni.isaac.lab.assets.RigidObjectCollection` class.
0.31.5 (2025-01-14)
0.32.5 (2025-01-14)
~~~~~~~~~~~~~~~~~~~
Fixed
......@@ -51,7 +51,7 @@ Fixed
* Fixed the respawn of only wrong object samples in :func:`repeated_objects_terrain` of :mod:`omni.isaac.lab.terrains.trimesh` module. Previously, the function was respawning all objects in the scene instead of only the wrong object samples, which in worst case could lead to infinite respawn loop.
0.31.4 (2025-01-08)
0.32.4 (2025-01-08)
~~~~~~~~~~~~~~~~~~~
Fixed
......@@ -61,7 +61,7 @@ Fixed
In body properties sections, the second dimension should be num_bodies but was documented as 1.
0.31.3 (2025-01-02)
0.32.3 (2025-01-02)
~~~~~~~~~~~~~~~~~~~
Added
......@@ -70,7 +70,7 @@ Added
* Added body tracking as an origin type to :class:`omni.isaac.lab.envs.ViewerCfg` and :class:`omni.isaac.lab.envs.ui.ViewportCameraController`.
0.31.2 (2024-12-22)
0.32.2 (2024-12-22)
~~~~~~~~~~~~~~~~~~~
Fixed
......@@ -79,7 +79,7 @@ Fixed
* Fixed populating default_joint_stiffness and default_joint_damping values for ImplicitActuator instances in :class:`omni.isaac.lab.assets.Articulation`
0.31.1 (2024-12-17)
0.32.1 (2024-12-17)
~~~~~~~~~~~~~~~~~~~
Added
......@@ -93,7 +93,7 @@ Added
:class:`omni.isaac.lab.envs.mdp.actions.OperationalSpaceControllerAction` class.
0.31.0 (2024-12-16)
0.32.0 (2024-12-16)
~~~~~~~~~~~~~~~~~~~
Changed
......@@ -102,6 +102,15 @@ Changed
* Previously, physx returns the rigid bodies and articulations velocities in the com of bodies rather than the link frame, while poses are in link frames. We now explicitly provide :attr:`body_link_state` and :attr:`body_com_state` APIs replacing the previous :attr:`body_state` API. Previous APIs are now marked as deprecated. Please update any code using the previous pose and velocity APIs to use the new ``*_link_*`` or ``*_com_*`` APIs in :attr:`omni.isaac_lab.assets.RigidBody`, :attr:`omni.isaac_lab.assets.RigidBodyCollection`, and :attr:`omni.isaac_lab.assets.Articulation`.
0.31.0 (2024-12-16)
~~~~~~~~~~~~~~~~~~~
Added
^^^^^
* Added :class:`ManagerBasedRLMimicEnv` and config classes for mimic data generation workflow for imitation learning.
0.30.3 (2024-12-16)
~~~~~~~~~~~~~~~~~~~
......
......@@ -688,7 +688,7 @@ class AppLauncher:
# set the nucleus directory manually to the latest published Nucleus
# note: this is done to ensure prior versions of Isaac Sim still use the latest assets
assets_path = "http://omniverse-content-production.s3-us-west-2.amazonaws.com/Assets/Isaac/4.2"
assets_path = "http://omniverse-content-staging.s3-us-west-2.amazonaws.com/Assets/Isaac/4.5"
carb_settings_iface.set_string("/persistent/isaac/asset_root/default", assets_path)
carb_settings_iface.set_string("/persistent/isaac/asset_root/cloud", assets_path)
carb_settings_iface.set_string("/persistent/isaac/asset_root/nvidia", assets_path)
......
......@@ -119,12 +119,16 @@ class Se3SpaceMouse(DeviceBase):
# implement a timeout for device search
for _ in range(5):
for device in hid.enumerate():
if device["product_string"] == "SpaceMouse Compact":
if (
device["product_string"] == "SpaceMouse Compact"
or device["product_string"] == "SpaceMouse Wireless"
):
# set found flag
found = True
vendor_id = device["vendor_id"]
product_id = device["product_id"]
# connect to the device
self._device.close()
self._device.open(vendor_id, product_id)
# check if device found
if not found:
......@@ -150,7 +154,7 @@ class Se3SpaceMouse(DeviceBase):
elif data[0] == 2 and not self._read_rotation:
self._delta_rot[1] = self.rot_sensitivity * convert_buffer(data[1], data[2])
self._delta_rot[0] = self.rot_sensitivity * convert_buffer(data[3], data[4])
self._delta_rot[2] = self.rot_sensitivity * convert_buffer(data[5], data[6])
self._delta_rot[2] = self.rot_sensitivity * convert_buffer(data[5], data[6]) * -1.0
# readings from the side buttons
elif data[0] == 3:
# press left button
......@@ -159,13 +163,13 @@ class Se3SpaceMouse(DeviceBase):
self._close_gripper = not self._close_gripper
# additional callbacks
if "L" in self._additional_callbacks:
self._additional_callbacks["L"]
self._additional_callbacks["L"]()
# right button is for reset
if data[1] == 2:
# reset layer
self.reset()
# additional callbacks
if "R" in self._additional_callbacks:
self._additional_callbacks["R"]
self._additional_callbacks["R"]()
if data[1] == 3:
self._read_rotation = not self._read_rotation
......@@ -52,4 +52,6 @@ from .manager_based_env import ManagerBasedEnv
from .manager_based_env_cfg import ManagerBasedEnvCfg
from .manager_based_rl_env import ManagerBasedRLEnv
from .manager_based_rl_env_cfg import ManagerBasedRLEnvCfg
from .manager_based_rl_mimic_env import ManagerBasedRLMimicEnv
from .mimic_env_cfg import *
from .utils.marl import multi_agent_to_single_agent, multi_agent_with_one_agent
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from omni.isaac.lab.envs import ManagerBasedRLEnv
class ManagerBasedRLMimicEnv(ManagerBasedRLEnv):
"""The superclass for the Isaac Lab Mimic environments.
This class inherits from :class:`ManagerBasedRLEnv` and provides a template for the functions that
need to be defined to run the Isaac Lab Mimic data generation workflow. The Isaac Lab data generation
pipeline, inspired by the MimicGen system, enables the generation of new datasets based on a few human
collected demonstrations. MimicGen is a novel approach designed to automatically synthesize large-scale,
rich datasets from a sparse set of human demonstrations by adapting them to new contexts. It manages to
replicate the benefits of large datasets while reducing the immense time and effort usually required to
gather extensive human demonstrations.
The MimicGen system works by parsing demonstrations into object-centric segments. It then adapts
these segments to new scenes by transforming each segment according to the new scene’s context, stitching
them into a coherent trajectory for a robotic end-effector to execute. This approach allows learners to train
proficient agents through imitation learning on diverse configurations of scenes, object instances, etc.
Key Features:
- Efficient Dataset Generation: Utilizes a small set of human demos to produce large scale demonstrations.
- Broad Applicability: Capable of supporting tasks that require a range of manipulation skills, such as
pick-and-place and interacting with articulated objects.
- Dataset Versatility: The synthetic data retains a quality that compares favorably with additional human demos.
"""
def get_robot_eef_pose(self, env_ind=0):
"""
Get current robot end effector pose. Should be the same frame as used by the robot end-effector controller.
Returns:
pose (torch.Tensor): 4x4 eef pose matrix
"""
raise NotImplementedError
def target_eef_pose_to_action(self, target_eef_pose, relative=True, env_ind=0):
"""
Takes a target pose for the end effector controller and returns an action
to try and achieve that target pose.
Args:
target_eef_pose (torch.Tensor): 4x4 target eef pose
relative (bool): if True, use relative pose actions, else absolute pose actions
Returns:
action (torch.Tensor): action compatible with env.step (minus gripper actuation)
"""
raise NotImplementedError
def action_to_target_eef_pos(self, action, relative=True, env_ind=0):
"""
Converts action (compatible with env.step) to a target pose for the end effector controller.
Inverse of @target_eef_pose_to_action. Usually used to infer a sequence of target controller poses
from a demonstration trajectory using the recorded actions.
Args:
action (torch.Tensor): environment action
relative (bool): if True, use relative pose actions, else absolute pose actions
Returns:
target_eef_pose (torch.Tensor): 4x4 target eef pose that @action corresponds to
"""
raise NotImplementedError
def action_to_gripper_action(self, action):
"""
Extracts the gripper actuation part of an action (compatible with env.step).
Args:
action (torch.Tensor): environment action
Returns:
gripper_action (torch.Tensor): subset of environment action for gripper actuation
"""
raise NotImplementedError
def get_object_poses(self, env_ind=0):
"""
Gets the pose of each object relevant to Isaac Lab Mimic data generation in the current scene.
Returns:
object_poses (dict): dictionary that maps object name (str) to object pose matrix (4x4 torch.Tensor)
"""
raise NotImplementedError
def get_subtask_term_signals(self, env_ind=0):
"""
Gets a dictionary of binary flags for each subtask in a task. The flag is 1
when the subtask has been completed and 0 otherwise. Isaac Lab Mimic only uses this
when parsing source demonstrations at the start of data generation, and it only
uses the first 0 -> 1 transition in this signal to detect the end of a subtask.
Returns:
subtask_term_signals (dict): dictionary that maps subtask name to termination flag (0 or 1)
"""
raise NotImplementedError
def is_success(self):
"""
Determines whether the task has succeeded based on internally defined success criteria.
This method implements the logic to evaluate the task's success by checking relevant
conditions and constraints derived from the observations in the scene.
Returns:
success (bool): True if the task is considered successful based on the defined criteria. False otherwise.
"""
raise NotImplementedError
def serialize(self):
"""
Save all information needed to re-instantiate this environment in a dictionary.
This is the same as @env_meta - environment metadata stored in hdf5 datasets,
and used in utils/env_utils.py.
"""
return dict(env_name=self.spec.id, type=2, env_kwargs=dict())
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Licensed under the NVIDIA Source Code License [see LICENSE for details].
"""
Base MimicEnvCfg object for Isaac Lab Mimic data generation.
"""
from omni.isaac.lab.utils import configclass
@configclass
class DataGenConfig:
"""Configuration settings for data generation processes within the Isaac Lab Mimic environment."""
name: str = "demo" # The name of the datageneration, default is "demo"
source_dataset_path: str = None # Path to the source dataset for mimic generation
generation_path: str = None # Path where the generated data will be saved
generation_guarantee: bool = False # Whether to guarantee generation of data (e.g., retry until successful)
generation_keep_failed: bool = True # Whether to keep failed generation trials
generation_num_trials: int = 10 # Number of trial to be generated
generation_select_src_per_subtask: bool = False # Whether to select source data per subtask
generation_transform_first_robot_pose: bool = False # Whether to transform the first robot pose during generation
generation_interpolate_from_last_target_pose: bool = True # Whether to interpolate from last target pose
task_name: str = None # Name of the task being configured
max_num_failures: int = 50 # Maximum number of failures allowed before stopping generation
num_demo_to_render: int = 50 # Number of demonstrations to render
num_fail_demo_to_render: int = 50 # Number of failed demonstrations to render
seed: int = 1 # Seed for randomization to ensure reproducibility
@configclass
class SubTaskConfig:
"""Configuration settings specific to the management of individual subtasks."""
object_ref: str = None # Reference to the object involved in this subtask
subtask_term_signal: str = None # Signal for subtask termination
subtask_term_offset_range: tuple = (0, 0) # Range for offsetting subtask termination
selection_strategy: str = None # Strategy for selecting subtask
selection_strategy_kwargs: dict = {} # Keyword arguments for the selection strategy
action_noise: float = 0.03 # Amplitude of action noise applied
num_interpolation_steps: int = 5 # Number of steps for interpolation between waypoints
num_fixed_steps: int = 0 # Number of fixed steps for the subtask
apply_noise_during_interpolation: bool = False # Whether to apply noise during interpolation
@configclass
class MimicEnvCfg:
"""Configuration class for the Mimic environment integration.
This class consolidates various configuration aspects for the Isaac Lab Mimic data generation pipeline.
"""
datagen_config: DataGenConfig = DataGenConfig() # Configuration for the data generation
subtask_configs: list[SubTaskConfig] = [] # List of configurations for each subtask
......@@ -47,6 +47,9 @@ class RecorderManagerBaseCfg:
dataset_export_mode: DatasetExportMode = DatasetExportMode.EXPORT_ALL
"""The mode to handle episode exports."""
export_in_record_pre_reset: bool = True
"""Whether to export episodes in the record_pre_reset call."""
class RecorderTerm(ManagerTermBase):
"""Base class for recorder terms.
......@@ -166,6 +169,9 @@ class RecorderManager(ManagerBase):
os.path.join(cfg.dataset_export_dir_path, f"{cfg.dataset_filename}_failed"), env_name=env_name
)
self._exported_successful_episode_count = {}
self._exported_failed_episode_count = {}
def __str__(self) -> str:
"""Returns: A string representation for recorder manager."""
msg = f"<RecorderManager> contains {len(self._term_names)} active terms.\n"
......@@ -201,6 +207,34 @@ class RecorderManager(ManagerBase):
"""Name of active recorder terms."""
return self._term_names
@property
def exported_successful_episode_count(self, env_id=None) -> int:
"""Number of successful episodes.
Args:
env_id: The environment id. Defaults to None, in which case all environments are considered.
Returns:
The number of successful episodes.
"""
if env_id is not None:
return self._exported_successful_episode_count.get(env_id, 0)
return sum(self._exported_successful_episode_count.values())
@property
def exported_failed_episode_count(self, env_id=None) -> int:
"""Number of failed episodes.
Args:
env_id: The environment id. Defaults to None, in which case all environments are considered.
Returns:
The number of failed episodes.
"""
if env_id is not None:
return self._exported_failed_episode_count.get(env_id, 0)
return sum(self._exported_failed_episode_count.values())
"""
Operations.
"""
......@@ -345,6 +379,7 @@ class RecorderManager(ManagerBase):
success_results |= self._env.termination_manager.get_term("success")[env_ids]
self.set_success_to_episodes(env_ids, success_results)
if self.cfg.export_in_record_pre_reset:
self.export_episodes(env_ids)
def record_post_reset(self, env_ids: Sequence[int] | None) -> None:
......@@ -395,6 +430,13 @@ class RecorderManager(ManagerBase):
if target_dataset_file_handler is not None:
target_dataset_file_handler.write_episode(self._episodes[env_id])
need_to_flush = True
# Update episode count
if episode_succeeded:
self._exported_successful_episode_count[env_id] = (
self._exported_successful_episode_count.get(env_id, 0) + 1
)
else:
self._exported_failed_episode_count[env_id] = self._exported_failed_episode_count.get(env_id, 0) + 1
# Reset the episode buffer for the given environment after export
self._episodes[env_id] = EpisodeData()
......@@ -421,6 +463,7 @@ class RecorderManager(ManagerBase):
"dataset_filename",
"dataset_export_dir_path",
"dataset_export_mode",
"export_in_record_pre_reset",
]:
continue
# check if term config is None
......
......@@ -24,7 +24,7 @@ INSTALL_REQUIRES = [
"prettytable==3.3.0",
"toml",
# devices
"hidapi",
"hidapi==0.14.0.post2",
# reinforcement learning
"gymnasium",
# procedural-generation
......
......@@ -19,12 +19,23 @@ simulation_app = AppLauncher(headless=True).app
"""Rest everything follows."""
import math
import numpy as np
import torch
import torch.utils.benchmark as benchmark
from math import pi as PI
from scipy.spatial.transform import Rotation as R
from scipy.spatial.transform import Slerp
import omni.isaac.lab.utils.math as math_utils
# Number of iterations to run the batched tests
NUM_ITERS = 100
# This value is set to because "float operations are inexact".
# Details: https://github.com/pytorch/pytorch/issues/17678
DECIMAL_PRECISION = 5
ATOL = 10 ** (-DECIMAL_PRECISION)
RTOL = 10 ** (-DECIMAL_PRECISION)
class TestMathUtilities(unittest.TestCase):
"""Test fixture for checking math utilities in Isaac Lab."""
......@@ -435,6 +446,143 @@ class TestMathUtilities(unittest.TestCase):
# Assert that the output is close to the expected result
torch.testing.assert_close(orthogonal_depth, expected_orthogonal_depth)
"""
Tests for math_utils.pose_inv function
This class checks the pose_inv function's output against the np.linalg.inv function's output.
1. test_single_numpy_comparison: Checks if the inverse of a random transformation matrix
matches NumPy's built-in inverse.
2. test_multi_numpy_comparison: Verifies the same for a batch of NUM_ITERS random matrices.
"""
def test_single_numpy_comparison(self):
for _ in range(NUM_ITERS):
test_mat = math_utils.generate_random_transformation_matrix(pos_boundary=10, rot_boundary=(2 * np.pi))
result = np.array(math_utils.pose_inv(test_mat))
expected = np.linalg.inv(np.array(test_mat))
np.testing.assert_array_almost_equal(result, expected, decimal=DECIMAL_PRECISION)
def test_multi_numpy_comparison(self):
# Generate NUM_ITERS random transformation matrices
test_mats = torch.stack([
math_utils.generate_random_transformation_matrix(pos_boundary=10, rot_boundary=(2 * math.pi))
for _ in range(NUM_ITERS)
])
result = np.array(math_utils.pose_inv(test_mats))
expected = np.linalg.inv(np.array(test_mats))
np.testing.assert_array_almost_equal(result, expected, decimal=DECIMAL_PRECISION)
"""
Tests for math_utils.quat_slerp function.
This class checks the quat_slerp function's output against the output from scipy.spatial.transform.Slerp.
1. test_quat_slerp_multi_scipy_comparison: Generates 20x2 random rotation matrices, find the interpolation rotation
and compares it to the output of scipy.spatial.transform.Slerp.
"""
def test_quat_slerp_multi_scipy_comparison(self):
# Generate NUM_ITERS random rotation matrices
random_rotation_matrices_1 = [math_utils.generate_random_rotation() for _ in range(NUM_ITERS)]
random_rotation_matrices_2 = [math_utils.generate_random_rotation() for _ in range(NUM_ITERS)]
tau_values = np.random.rand(10) # Random values in the range [0, 1]
for rmat1, rmat2 in zip(random_rotation_matrices_1, random_rotation_matrices_2):
# Convert the rotation matrices to quaternions
q1 = R.from_matrix(rmat1).as_quat() # (x, y, z, w)
q2 = R.from_matrix(rmat2).as_quat() # (x, y, z, w)
# Compute expected results using scipy's Slerp
key_rots = R.from_quat(np.array([q1, q2]))
key_times = [0, 1]
slerp = Slerp(key_times, key_rots)
for tau in tau_values:
expected = slerp(tau).as_quat() # (x, y, z, w)
result = math_utils.quat_slerp(torch.tensor(q1), torch.tensor(q2), tau)
# Assert that the result is almost equal to the expected quaternion
np.testing.assert_array_almost_equal(result, expected, decimal=DECIMAL_PRECISION)
"""
Tests for math_utils.interpolate_rotations function.
This class checks the interpolate_rotations function's output against the output from scipy.spatial.transform.Slerp.
1. test_interpolate_rotations_multi_scipy_comparison: Generates 20x2 random rotation matrices, finds an array of interpolated rotations
and compares it to the output of scipy.spatial.transform.Slerp.
"""
def test_interpolate_rotations_multi_scipy_comparison(self):
# Generate NUM_ITERS random rotation matrices
random_rotation_matrices_1 = [math_utils.generate_random_rotation() for _ in range(NUM_ITERS)]
random_rotation_matrices_2 = [math_utils.generate_random_rotation() for _ in range(NUM_ITERS)]
for rmat1, rmat2 in zip(random_rotation_matrices_1, random_rotation_matrices_2):
# Compute expected results using scipy's Slerp
key_rots = R.from_matrix(np.array([rmat1, rmat2]))
# Create a Slerp object and interpolate create the interpolated matrices
# Minimum 2 required because Interpolate_rotations returns one extra rotation matrix
num_steps = np.random.randint(2, 51)
key_times = [0, 1]
slerp = Slerp(key_times, key_rots)
interp_times = np.linspace(0, 1, num_steps)
expected = slerp(interp_times).as_matrix()
# Test 1:
# Interpolate_rotations using interpolate_rotations and quat_slerp
# interpolate_rotations returns one extra rotation matrix hence num_steps-1
result_quat = math_utils.interpolate_rotations(rmat1, rmat2, num_steps - 1)
# Assert that the result is almost equal to the expected quaternion
np.testing.assert_array_almost_equal(result_quat, expected, decimal=DECIMAL_PRECISION)
# Test 2:
# Interpolate_rotations using axis_angle and ensure the result is still the same
# interpolate_rotations returns one extra rotation matrix hence num_steps-1
result_axis_angle = math_utils.interpolate_rotations(rmat1, rmat2, num_steps - 1, axis_angle=True)
# Assert that the result is almost equal to the expected quaternion
np.testing.assert_array_almost_equal(result_axis_angle, expected, decimal=DECIMAL_PRECISION)
"""
Tests for math_utils.interpolate_poses function.
This class checks the interpolate_poses function's output against the output from scipy.spatial.transform.Slerp.
1. test_interpolate_poses_multi_scipy_comparison: Generates 20x2 random transformation matrices,
computes an array of interpolated transformations
and compares it to the output of scipy.spatial.transform.Slerp and np.linspace
"""
def test_interpolate_poses_multi_scipy_comparison(self):
# Generate NUM_ITERS random transformation matrices
random_mat_1 = [math_utils.generate_random_transformation_matrix() for _ in range(NUM_ITERS)]
random_mat_2 = [math_utils.generate_random_transformation_matrix() for _ in range(NUM_ITERS)]
for mat1, mat2 in zip(random_mat_1, random_mat_2):
pos_1, rmat1 = math_utils.unmake_pose(mat1)
pos_2, rmat2 = math_utils.unmake_pose(mat2)
# Compute expected results using scipy's Slerp
key_rots = R.from_matrix(np.array([rmat1, rmat2]))
# Create a Slerp object and interpolate create the interpolated rotation matrices
# Minimum 3 required because interpolate_poses returns extra staring and ending pose matrices
num_steps = np.random.randint(3, 51)
key_times = [0, 1]
slerp = Slerp(key_times, key_rots)
interp_times = np.linspace(0, 1, num_steps)
expected_quat = slerp(interp_times).as_matrix()
# Test interpolation against expected result using np.linspace
expected_pos = np.linspace(pos_1, pos_2, num_steps)
# interpolate_poses using interpolate_poses and quat_slerp
# interpolate_poses returns extra staring and ending pose matrices hence num_steps-2
interpolated_poses, _ = math_utils.interpolate_poses(
math_utils.make_pose(pos_1, rmat1), math_utils.make_pose(pos_2, rmat2), num_steps - 2
)
result_pos, result_quat = math_utils.unmake_pose(interpolated_poses)
# Assert that the result is almost equal to the expected quaternion
np.testing.assert_array_almost_equal(result_quat, expected_quat, decimal=DECIMAL_PRECISION)
np.testing.assert_array_almost_equal(result_pos, expected_pos, decimal=DECIMAL_PRECISION)
if __name__ == "__main__":
run_tests()
[package]
# Semantic Versioning is used: https://semver.org/
version = "1.0.0"
# Description
category = "isaaclab"
readme = "README.md"
title = "Isaac Lab Mimic"
author = "Isaac Lab Project Developers"
maintainer = "Isaac Lab Project Developers"
description="Mimic for Isaac Lab"
repository = "https://github.com/isaac-sim/IsaacLab.git"
keywords = ["extension", "template", "isaaclab"]
[dependencies]
"omni.isaac.lab" = {}
"omni.isaac.lab_assets" = {}
"omni.isaac.lab_tasks" = {}
# NOTE: Add additional dependencies here
[[python.module]]
name = "omni.isaac.lab_mimic"
[isaaclab_settings]
Changelog
---------
1.0.0 (2024-12-06)
~~~~~~~~~~~~~~~~~~
Added
^^^^^
* Add initial version of Isaac Lab Mimic
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
__version__ = "1.0.0"
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
"""
Defines structure of information that is needed from an environment for data generation.
"""
import torch
from copy import deepcopy
class DatagenInfo:
"""
Defines the structure of information required from an environment for data generation processes.
The `DatagenInfo` class centralizes all essential data elements needed for data generation in one place,
reducing the overhead and complexity of repeatedly querying the environment whenever this information is needed.
To allow for flexibility,not all information must be present.
Core Elements:
- **eef_pose**: Captures the current 6 dimensional poses of the robot's end-effector.
- **object_poses**: Captures the 6 dimensional poses of relevant objects in the scene.
- **subtask_term_signals**: Captures subtask completions signals.
- **target_eef_pose**: Captures the target 6 dimensional poses for robot's end effector at each time step.
- **gripper_action**: Captures the gripper's state.
"""
def __init__(
self,
eef_pose=None,
object_poses=None,
subtask_term_signals=None,
target_eef_pose=None,
gripper_action=None,
):
"""
Args:
eef_pose (torch.Tensor or None): robot end effector poses of shape [..., 4, 4]
object_poses (dict or None): dictionary mapping object name to object poses
of shape [..., 4, 4]
subtask_term_signals (dict or None): dictionary mapping subtask name to a binary
indicator (0 or 1) on whether subtask has been completed. Each value in the
dictionary could be an int, float, or torch.Tensor of shape [..., 1].
target_eef_pose (torch.Tensor or None): target end effector poses of shape [..., 4, 4]
gripper_action (torch.Tensor or None): gripper actions of shape [..., D] where D
is the dimension of the gripper actuation action for the robot arm
"""
# Type checks using assert
if eef_pose is not None:
assert isinstance(
eef_pose, torch.Tensor
), f"Expected 'eef_pose' to be of type torch.Tensor, but got {type(eef_pose)}"
if object_poses is not None:
assert isinstance(
object_poses, dict
), f"Expected 'object_poses' to be a dictionary, but got {type(object_poses)}"
for k, v in object_poses.items():
assert isinstance(
v, torch.Tensor
), f"Expected 'object_poses[{k}]' to be of type torch.Tensor, but got {type(v)}"
if subtask_term_signals is not None:
assert isinstance(
subtask_term_signals, dict
), f"Expected 'subtask_term_signals' to be a dictionary, but got {type(subtask_term_signals)}"
for k, v in subtask_term_signals.items():
assert isinstance(
v, (torch.Tensor, int, float)
), f"Expected 'subtask_term_signals[{k}]' to be of type torch.Tensor, int, or float, but got {type(v)}"
if target_eef_pose is not None:
assert isinstance(
target_eef_pose, torch.Tensor
), f"Expected 'target_eef_pose' to be of type torch.Tensor, but got {type(target_eef_pose)}"
if gripper_action is not None:
assert isinstance(
gripper_action, torch.Tensor
), f"Expected 'gripper_action' to be of type torch.Tensor, but got {type(gripper_action)}"
self.eef_pose = None
if eef_pose is not None:
self.eef_pose = eef_pose
self.object_poses = None
if object_poses is not None:
self.object_poses = {k: object_poses[k] for k in object_poses}
self.subtask_term_signals = None
if subtask_term_signals is not None:
self.subtask_term_signals = dict()
for k in subtask_term_signals:
if isinstance(subtask_term_signals[k], (float, int)):
self.subtask_term_signals[k] = subtask_term_signals[k]
else:
# only create torch tensor if value is not a single value
self.subtask_term_signals[k] = subtask_term_signals[k]
self.target_eef_pose = None
if target_eef_pose is not None:
self.target_eef_pose = target_eef_pose
self.gripper_action = None
if gripper_action is not None:
self.gripper_action = gripper_action
def to_dict(self):
"""
Convert this instance to a dictionary containing the same information.
"""
ret = dict()
if self.eef_pose is not None:
ret["eef_pose"] = self.eef_pose
if self.object_poses is not None:
ret["object_poses"] = deepcopy(self.object_poses)
if self.subtask_term_signals is not None:
ret["subtask_term_signals"] = deepcopy(self.subtask_term_signals)
if self.target_eef_pose is not None:
ret["target_eef_pose"] = self.target_eef_pose
if self.gripper_action is not None:
ret["gripper_action"] = self.gripper_action
return ret
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
import asyncio
from omni.isaac.lab_mimic.datagen.datagen_info import DatagenInfo
import omni.isaac.lab.utils.math as PoseUtils
from omni.isaac.lab.utils.datasets import EpisodeData, HDF5DatasetFileHandler
class DataGenInfoPool:
"""
Pool of DatagenInfo for data generation.
This class is a container for storing `DatagenInfo` objects that are extracted from episodes.
The pool supports the use of an asyncio lock to safely add new episodes to the pool while
consuming the data, so it can be shared across multiple mimic data generators.
"""
def __init__(self, env, env_cfg, device, asyncio_lock: asyncio.Lock | None = None):
"""
Args:
env_cfg (dict): environment configuration
device (torch.device): device to store the data
asyncio_lock (asyncio.Lock or None): asyncio lock to use for thread safety
"""
self._datagen_infos = []
self._subtask_indices = []
self.env = env
self.env_cfg = env_cfg
self.device = device
self._asyncio_lock = asyncio_lock
self.subtask_term_signals = [subtask_config.subtask_term_signal for subtask_config in env_cfg.subtask_configs]
self.subtask_term_offset_ranges = [
subtask_config.subtask_term_offset_range for subtask_config in env_cfg.subtask_configs
]
@property
def datagen_infos(self):
"""Returns the datagen infos."""
return self._datagen_infos
@property
def subtask_indices(self):
"""Returns the subtask indices."""
return self._subtask_indices
@property
def asyncio_lock(self):
"""Returns the asyncio lock."""
return self._asyncio_lock
@property
def num_datagen_infos(self):
"""Returns the number of datagen infos."""
return len(self._datagen_infos)
async def add_episode(self, episode: EpisodeData):
"""
Add a datagen info from the given episode.
Args:
episode (EpisodeData): episode to add
"""
if self._asyncio_lock is not None:
async with self._asyncio_lock:
self._add_episode(episode)
else:
self._add_episode(episode)
def _add_episode(self, episode: EpisodeData):
"""
Add a datagen info from the given episode.
Args:
episode (EpisodeData): episode to add
"""
ep_grp = episode.data
# extract datagen info
# Extract eef poses
eef_pos = ep_grp["obs"]["eef_pos"]
# format (w, x, y, z)
eef_quat = ep_grp["obs"]["eef_quat"]
eef_rot_matrices = PoseUtils.matrix_from_quat(eef_quat) # shape (N, 3, 3)
# Create pose matrices for all environments
eef_pose = PoseUtils.make_pose(eef_pos, eef_rot_matrices) # shape (N, 4, 4)
object_poses_dict = dict()
# TODO: change object_pose key in the dataset to object_state since it is not just the pose
for object_name, value in ep_grp["obs"]["object_pose"].items():
# object_pose
value = value["root_pose"]
# Root state ``[pos, quat, lin_vel, ang_vel]`` in simulation world frame. Shape is (num_steps, 13).
# Quaternion ordering is wxyz
# Convert to rotation matrices
object_rot_matrices = PoseUtils.matrix_from_quat(value[:, 3:7]) # shape (N, 3, 3)
object_rot_positions = value[:, 0:3] # shape (N, 3)
object_poses_dict[object_name] = PoseUtils.make_pose(object_rot_positions, object_rot_matrices)
# Extract gripper actions
gripper_actions = self.env.action_to_gripper_action(ep_grp["actions"])
ep_datagen_info_obj = DatagenInfo(
eef_pose=eef_pose,
object_poses=object_poses_dict,
subtask_term_signals=ep_grp["obs"]["subtask_term_signals"],
target_eef_pose=ep_grp["obs"]["target_eef_pose"],
gripper_action=gripper_actions,
)
self._datagen_infos.append(ep_datagen_info_obj)
# parse subtask indices using subtask termination signals
ep_subtask_indices = []
prev_subtask_term_ind = 0
for subtask_ind in range(len(self.subtask_term_signals)):
subtask_term_signal = self.subtask_term_signals[subtask_ind]
if subtask_term_signal is None:
# final subtask, finishes at end of demo
subtask_term_ind = ep_grp["actions"].shape[0]
else:
# trick to detect index where first 0 -> 1 transition occurs - this will be the end of the subtask
subtask_indicators = ep_datagen_info_obj.subtask_term_signals[subtask_term_signal].flatten().int()
diffs = subtask_indicators[1:] - subtask_indicators[:-1]
end_ind = int(diffs.nonzero()[0][0]) + 1
subtask_term_ind = end_ind + 1 # increment to support indexing like demo[start:end]
ep_subtask_indices.append([prev_subtask_term_ind, subtask_term_ind])
prev_subtask_term_ind = subtask_term_ind
# run sanity check on subtask_term_offset_range in task spec to make sure we can never
# get an empty subtask in the worst case when sampling subtask bounds:
#
# end index of subtask i + max offset of subtask i < end index of subtask i + 1 + min offset of subtask i + 1
#
assert len(ep_subtask_indices) == len(
self.subtask_term_signals
), "mismatch in length of extracted subtask info and number of subtasks"
for i in range(1, len(ep_subtask_indices)):
prev_max_offset_range = self.subtask_term_offset_ranges[i - 1][1]
assert (
ep_subtask_indices[i - 1][1] + prev_max_offset_range
< ep_subtask_indices[i][1] + self.subtask_term_offset_ranges[i][0]
), (
"subtask sanity check violation in demo with subtask {} end ind {}, subtask {} max offset {},"
" subtask {} end ind {}, and subtask {} min offset {}".format(
i - 1,
ep_subtask_indices[i - 1][1],
i - 1,
prev_max_offset_range,
i,
ep_subtask_indices[i][1],
i,
self.subtask_term_offset_ranges[i][0],
)
)
self._subtask_indices.append(ep_subtask_indices)
def load_from_dataset_file(self, file_path, select_demo_keys: str | None = None):
"""
Load from a dataset file.
Args:
file_path (str): path to the dataset file
select_demo_keys (str or None): keys of the demos to load
"""
dataset_file_handler = HDF5DatasetFileHandler()
dataset_file_handler.open(file_path)
episode_names = dataset_file_handler.get_episode_names()
if len(episode_names) == 0:
return
for episode_name in episode_names:
if select_demo_keys is not None and episode_name not in select_demo_keys:
continue
episode = dataset_file_handler.load_episode(episode_name, self.device)
self._add_episode(episode)
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
import gymnasium as gym
from .franka_stack_ik_rel_mimic_env import FrankaCubeStackIKRelMimicEnv
from .franka_stack_ik_rel_mimic_env_cfg import FrankaCubeStackIKRelMimicEnvCfg
##
# Inverse Kinematics - Relative Pose Control
##
gym.register(
id="Isaac-Stack-Cube-Franka-IK-Rel-Mimic-v0",
entry_point="omni.isaac.lab_mimic.envs:FrankaCubeStackIKRelMimicEnv",
kwargs={
"env_cfg_entry_point": franka_stack_ik_rel_mimic_env_cfg.FrankaCubeStackIKRelMimicEnvCfg,
},
disable_env_checker=True,
)
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
import torch
import omni.isaac.lab.utils.math as PoseUtils
from omni.isaac.lab.envs import ManagerBasedRLMimicEnv
from omni.isaac.lab_tasks.manager_based.manipulation.stack.mdp import cubes_stacked
class FrankaCubeStackIKRelMimicEnv(ManagerBasedRLMimicEnv):
def get_robot_eef_pose(self, env_ind=0):
"""
Get current robot end effector pose. Should be the same frame as used by the robot end-effector controller.
Returns:
pose (torch.Tensor): 4x4 eef pose matrix
"""
# Retrieve end effector pose from the observation buffer
eef_pos = self.obs_buf["policy"]["eef_pos"][env_ind]
eef_quat = self.obs_buf["policy"]["eef_quat"][env_ind]
# Quaternion format is w,x,y,z
return PoseUtils.make_pose(eef_pos, PoseUtils.matrix_from_quat(eef_quat))
def target_eef_pose_to_action(self, target_eef_pose, relative=True, env_ind=0):
"""
Takes a target pose for the end effector controller and returns an action
(usually a normalized delta pose action) to try and achieve that target pose.
Args:
target_eef_pose (torch.Tensor): 4x4 target eef pose
relative (bool): if True, use relative pose actions, else absolute pose actions
Returns:
action (torch.Tensor): action compatible with env.step (minus gripper actuation)
"""
# target position and rotation
target_pos, target_rot = PoseUtils.unmake_pose(target_eef_pose)
# current position and rotation
curr_pose = self.get_robot_eef_pose(env_ind=env_ind)
curr_pos, curr_rot = PoseUtils.unmake_pose(curr_pose)
if relative:
# normalized delta position action
delta_position = target_pos - curr_pos
# delta_position = np.clip(delta_position / max_dpos, -1., 1.)
# normalized delta rotation action
delta_rot_mat = target_rot.matmul(curr_rot.transpose(-1, -2))
delta_quat = PoseUtils.quat_from_matrix(delta_rot_mat)
delta_rotation = PoseUtils.axis_angle_from_quat(delta_quat)
# delta_rotation = np.clip(delta_rotation / max_drot, -1., 1.)
return torch.cat([delta_position, delta_rotation], dim=0)
else:
raise NotImplementedError("Absolute pose actions are not implemented.")
return
def action_to_target_eef_pos(self, action, relative=True, env_ind=0):
"""
Converts action (compatible with env.step) to a target pose for the end effector controller.
Inverse of @target_eef_pose_to_action. Usually used to infer a sequence of target controller poses
from a demonstration trajectory using the recorded actions.
Args:
action (torch.Tensor): environment action
relative (bool): if True, use relative pose actions, else absolute pose actions
Returns:
target_eef_pose (torch.Tensor): 4x4 target eef pose that @action corresponds to
"""
target_poses = []
for env_ind in range(self.scene.num_envs):
delta_position = action[env_ind][:3]
delta_rotation = action[env_ind][3:6]
# current position and rotation
curr_pose = self.get_robot_eef_pose(env_ind=env_ind)
curr_pos, curr_rot = PoseUtils.unmake_pose(curr_pose)
# get pose target
target_pos = curr_pos + delta_position
# Convert delta_rotation to axis angle form
delta_rotation_angle = torch.linalg.norm(delta_rotation, dim=-1, keepdim=True)
# make sure that axis is a unit vector
# Check for invalid division
if torch.isclose(delta_rotation_angle, torch.tensor([0.0], device=delta_rotation_angle.device)):
# Quaternion format is wxyz
delta_quat = torch.tensor([1.0, 0.0, 0.0, 0.0], device=delta_rotation_angle.device)
else:
delta_rotation_axis = delta_rotation / delta_rotation_angle
delta_quat = PoseUtils.quat_from_angle_axis(delta_rotation_angle, delta_rotation_axis).squeeze(0)
delta_rot_mat = PoseUtils.matrix_from_quat(delta_quat)
target_rot = torch.matmul(delta_rot_mat, curr_rot)
target_pose = PoseUtils.make_pose(target_pos, target_rot).clone()
target_poses.append(target_pose)
return target_poses
def action_to_gripper_action(self, action):
"""
Extracts the gripper actuation part of an action (compatible with env.step).
Args:
action (torch.Tensor): environment action of shape N x action_dim. Where N is number of steps in a demo
Returns:
gripper_action (torch.Tensor): subset of environment action for gripper actuation of shape N x gripper_action_dim
"""
# last dimension is gripper action
return action[:, -1:]
def get_object_poses(self, env_ind=0):
"""
Gets the pose of each object relevant to Isaac Lab Mimic data generation in the current scene.
Returns:
object_poses (dict): dictionary that maps object name (str) to object pose matrix (4x4 torch.Tensor)
"""
# three relevant objects - three cubes
# Retrieve end effector pose from the observation buffer
cube_positions = self.obs_buf["policy"]["cube_positions"][env_ind]
cube_orientations = self.obs_buf["policy"]["cube_orientations"][env_ind]
cube_1_pos = cube_positions[:3]
cube_1_rot = cube_orientations[:4]
cube_2_pos = cube_positions[3:6]
cube_2_rot = cube_orientations[4:8]
cube_3_pos = cube_positions[6:]
cube_3_rot = cube_orientations[8:]
# Quaternion format is w,x,y,z
return dict(
cube_1=PoseUtils.make_pose(cube_1_pos, PoseUtils.matrix_from_quat(cube_1_rot)),
cube_2=PoseUtils.make_pose(cube_2_pos, PoseUtils.matrix_from_quat(cube_2_rot)),
cube_3=PoseUtils.make_pose(cube_3_pos, PoseUtils.matrix_from_quat(cube_3_rot)),
)
def get_subtask_term_signals(self, env_ind=0):
"""
Gets a dictionary of binary flags for each subtask in a task. The flag is 1
when the subtask has been completed and 0 otherwise. Isaac Lab Mimic only uses this
when parsing source demonstrations at the start of data generation, and it only
uses the first 0 -> 1 transition in this signal to detect the end of a subtask.
Returns:
subtask_term_signals (dict): dictionary that maps subtask name to termination flag (0 or 1)
"""
signals = dict()
subtask_terms = self.obs_buf["subtask_terms"]
signals["grasp_1"] = subtask_terms["grasp_1"][env_ind]
signals["grasp_2"] = subtask_terms["grasp_2"][env_ind]
signals["stack_1"] = subtask_terms["stack_1"][env_ind]
# final subtask is placing cubeC on cubeA (motion relative to cubeA) - but final subtask signal is not needed
return signals
def is_success(self):
return cubes_stacked(self, atol=0.001, rtol=0.001)
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
from omni.isaac.lab.envs.mimic_env_cfg import MimicEnvCfg, SubTaskConfig
from omni.isaac.lab.utils import configclass
from omni.isaac.lab_tasks.manager_based.manipulation.stack.config.franka.stack_ik_rel_env_cfg import (
FrankaCubeStackEnvCfg,
)
@configclass
class FrankaCubeStackIKRelMimicEnvCfg(FrankaCubeStackEnvCfg, MimicEnvCfg):
def __post_init__(self):
# post init of parents
super().__post_init__()
# # TODO: Figure out how we can move this to the MimicEnvCfg class
# # The __post_init__() above only calls the init for FrankaCubeStackEnvCfg and not MimicEnvCfg
# # https://stackoverflow.com/questions/59986413/achieving-multiple-inheritance-using-python-dataclasses
# Override the existing values
self.datagen_config.name = "demo_src_stack_isaac_lab_task_D0"
self.datagen_config.generation_guarantee = True
self.datagen_config.generation_keep_failed = True
self.datagen_config.generation_num_trials = 10
self.datagen_config.generation_select_src_per_subtask = True
self.datagen_config.generation_transform_first_robot_pose = False
self.datagen_config.generation_interpolate_from_last_target_pose = True
self.datagen_config.max_num_failures = 25
self.datagen_config.num_demo_to_render = 10
self.datagen_config.num_fail_demo_to_render = 25
self.datagen_config.seed = 1
# The following are the subtask configurations for the stack task.
self.subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_2",
# This key corresponds to the binary indicator in "datagen_info" that signals
# when this subtask is finished (e.g., on a 0 to 1 edge).
subtask_term_signal="grasp_1",
# Specifies time offsets for data generation when splitting a trajectory into
# subtask segments. Random offsets are added to the termination boundary.
subtask_term_offset_range=(10, 20),
# Selection strategy for the source subtask segment during data generation
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.03,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=5,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
self.subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_1",
# Corresponding key for the binary indicator in "datagen_info" for completion
subtask_term_signal="stack_1",
# Time offsets for data generation when splitting a trajectory
subtask_term_offset_range=(10, 20),
# Selection strategy for source subtask segment
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.03,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=5,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
self.subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_3",
# Corresponding key for the binary indicator in "datagen_info" for completion
subtask_term_signal="grasp_2",
# Time offsets for data generation when splitting a trajectory
subtask_term_offset_range=(10, 20),
# Selection strategy for source subtask segment
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.03,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=5,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
self.subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_2",
# End of final subtask does not need to be detected
subtask_term_signal=None,
# No time offsets for the final subtask
subtask_term_offset_range=(0, 0),
# Selection strategy for source subtask segment
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.03,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=5,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
[build-system]
requires = ["setuptools", "wheel", "toml"]
build-backend = "setuptools.build_meta"
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
"""Installation script for the 'omni.isaac.lab_mimic' python package."""
import os
import toml
from setuptools import setup
# Obtain the extension data from the extension.toml file
EXTENSION_PATH = os.path.dirname(os.path.realpath(__file__))
# Read the extension.toml file
EXTENSION_TOML_DATA = toml.load(os.path.join(EXTENSION_PATH, "config", "extension.toml"))
# Installation operation
setup(
name="omni.isaac.lab_mimic",
packages=["omni.isaac.lab_mimic"],
author=EXTENSION_TOML_DATA["package"]["author"],
maintainer=EXTENSION_TOML_DATA["package"]["maintainer"],
url=EXTENSION_TOML_DATA["package"]["repository"],
version=EXTENSION_TOML_DATA["package"]["version"],
description=EXTENSION_TOML_DATA["package"]["description"],
keywords=EXTENSION_TOML_DATA["package"]["keywords"],
license="MIT",
include_package_data=True,
python_requires=">=3.10",
classifiers=[
"Natural Language :: English",
"Programming Language :: Python :: 3.10",
"Isaac Sim :: 2023.1.1",
"Isaac Sim :: 4.0.0",
],
zip_safe=False,
)
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
"""Test dataset generation for Isaac Lab Mimic workflow."""
from omni.isaac.lab.app import AppLauncher
# launch omniverse app
simulation_app = AppLauncher(headless=True).app
import os
import subprocess
import tempfile
import unittest
from omni.isaac.lab.utils.assets import ISAACLAB_NUCLEUS_DIR, retrieve_file_path
DATASETS_DOWNLOAD_DIR = tempfile.mkdtemp(suffix="_Isaac-Stack-Cube-Franka-IK-Rel-Mimic-v0")
NUCLEUS_DATASET_PATH = os.path.join(ISAACLAB_NUCLEUS_DIR, "Tests", "Mimic", "dataset.hdf5")
class TestGenerateDataset(unittest.TestCase):
"""Test the dataset generation behavior of the Isaac Lab Mimic workflow."""
def setUp(self):
"""Set up the environment for testing."""
# Create the datasets directory if it does not exist
if not os.path.exists(DATASETS_DOWNLOAD_DIR):
print("Creating directory : ", DATASETS_DOWNLOAD_DIR)
os.makedirs(DATASETS_DOWNLOAD_DIR)
# Try to download the dataset from Nucleus
try:
retrieve_file_path(NUCLEUS_DATASET_PATH, DATASETS_DOWNLOAD_DIR)
except Exception as e:
print(e)
print("Could not download dataset from Nucleus")
self.fail(
"The dataset required for this test is currently unavailable. Dataset path: " + NUCLEUS_DATASET_PATH
)
# Set the environment variable PYTHONUNBUFFERED to 1 to get all text outputs in result.stdout
self.pythonunbuffered_env_var_ = os.environ.get("PYTHONUNBUFFERED")
os.environ["PYTHONUNBUFFERED"] = "1"
# Automatically detect the workflow root (backtrack from current file location)
current_dir = os.path.dirname(os.path.abspath(__file__))
workflow_root = os.path.abspath(os.path.join(current_dir, "../../../.."))
# Run the command to generate core configs
config_command = [
workflow_root + "/isaaclab.sh",
"-p",
os.path.join(workflow_root, "source/standalone/workflows/isaac_lab_mimic/annotate_demos.py"),
"--task",
"Isaac-Stack-Cube-Franka-IK-Rel-Mimic-v0",
"--input_file",
DATASETS_DOWNLOAD_DIR + "/dataset.hdf5",
"--output_file",
DATASETS_DOWNLOAD_DIR + "/annotated_dataset.hdf5",
"--signals",
"grasp_1",
"stack_1",
"grasp_2",
"--auto",
"--headless",
]
print(config_command)
# Execute the command and capture the result
result = subprocess.run(config_command, capture_output=True, text=True)
# Print the result for debugging purposes
print("Config generation result:")
print(result.stdout) # Print standard output from the command
print(result.stderr) # Print standard error from the command
# Check if the config generation was successful
self.assertEqual(result.returncode, 0, msg=result.stderr)
def tearDown(self):
"""Clean up after tests."""
if self.pythonunbuffered_env_var_:
os.environ["PYTHONUNBUFFERED"] = self.pythonunbuffered_env_var_
else:
del os.environ["PYTHONUNBUFFERED"]
def test_generate_dataset(self):
"""Test the dataset generation script."""
# Automatically detect the workflow root (backtrack from current file location)
current_dir = os.path.dirname(os.path.abspath(__file__))
workflow_root = os.path.abspath(os.path.join(current_dir, "../../../.."))
# Define the command to run the dataset generation script
command = [
workflow_root + "/isaaclab.sh",
"-p",
os.path.join(workflow_root, "source/standalone/workflows/isaac_lab_mimic/generate_dataset.py"),
"--input_file",
DATASETS_DOWNLOAD_DIR + "/annotated_dataset.hdf5",
"--output_file",
DATASETS_DOWNLOAD_DIR + "/generated_dataset.hdf5",
"--generation_num_trials",
"1",
"--headless",
]
# Call the script and capture output
result = subprocess.run(command, capture_output=True, text=True)
# Print the result for debugging purposes
print("Dataset generation result:")
print(result.stdout) # Print standard output from the command
print(result.stderr) # Print standard error from the command
# Check if the script executed successfully
self.assertEqual(result.returncode, 0, msg=result.stderr)
# Check for specific output
expected_output = "successes/attempts. Exiting"
self.assertIn(expected_output, result.stdout)
if __name__ == "__main__":
unittest.main()
......@@ -95,7 +95,7 @@ def main():
)
elif args_cli.teleop_device.lower() == "spacemouse":
teleop_interface = Se3SpaceMouse(
pos_sensitivity=0.05 * args_cli.sensitivity, rot_sensitivity=0.005 * args_cli.sensitivity
pos_sensitivity=0.05 * args_cli.sensitivity, rot_sensitivity=0.05 * args_cli.sensitivity
)
elif args_cli.teleop_device.lower() == "gamepad":
teleop_interface = Se3Gamepad(
......@@ -108,9 +108,15 @@ def main():
teleop_interface.add_callback("RESET", env.reset)
else:
raise ValueError(f"Invalid device interface '{args_cli.teleop_device}'. Supported: 'keyboard', 'spacemouse'.")
# add teleoperation key for env reset
teleop_interface.add_callback("L", env.reset)
# print helper for keyboard
should_reset_recording_instance = False
def reset_recording_instance():
nonlocal should_reset_recording_instance
should_reset_recording_instance = True
teleop_interface.add_callback("R", reset_recording_instance)
print(teleop_interface)
# reset environment
......@@ -131,6 +137,10 @@ def main():
# apply actions
env.step(actions)
if should_reset_recording_instance:
env.reset()
should_reset_recording_instance = False
# close the simulator
env.close()
......
......@@ -18,6 +18,7 @@ optional arguments:
--teleop_device Device for interacting with environment. (default: keyboard)
--dataset_file File path to export recorded demos. (default: "./datasets/dataset.hdf5")
--step_hz Environment stepping rate in Hz. (default: 30)
--num_demos Number of demonstrations to record. (default: 0)
"""
"""Launch Isaac Sim Simulator first."""
......@@ -35,6 +36,9 @@ parser.add_argument(
"--dataset_file", type=str, default="./datasets/dataset.hdf5", help="File path to export recorded demos."
)
parser.add_argument("--step_hz", type=int, default=30, help="Environment stepping rate in Hz.")
parser.add_argument(
"--num_demos", type=int, default=0, help="Number of demonstrations to record. Set to 0 for infinite."
)
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
......@@ -164,6 +168,7 @@ def main():
teleop_interface.reset()
# simulate environment -- run everything in inference mode
current_recorded_demo_count = 0
with contextlib.suppress(KeyboardInterrupt) and torch.inference_mode():
while True:
# get keyboard command
......@@ -181,6 +186,18 @@ def main():
env.reset()
should_reset_recording_instance = False
# print out the current demo count if it has changed
if env.unwrapped.recorder_manager.exported_successful_episode_count > current_recorded_demo_count:
current_recorded_demo_count = env.unwrapped.recorder_manager.exported_successful_episode_count
print(f"Recorded {current_recorded_demo_count} demonstrations.")
if (
args_cli.num_demos > 0
and env.unwrapped.recorder_manager.exported_successful_episode_count >= args_cli.num_demos
):
print(f"All {args_cli.num_demos} demonstrations recorded. Exiting the app.")
break
# check that simulation is stopped or not
if env.unwrapped.sim.is_stopped():
break
......
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
"""Script to add mimic annotations to demos to be used as source demos for mimic dataset generation."""
"""Launch Isaac Sim Simulator first."""
import argparse
from omni.isaac.lab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Collect demonstrations for Isaac Lab environments.")
parser.add_argument("--task", type=str, default=None, help="Name of the task.")
parser.add_argument(
"--input_file", type=str, default="./datasets/dataset.hdf5", help="File name of the dataset to be annotated."
)
parser.add_argument(
"--output_file",
type=str,
default="./datasets/dataset_annotated.hdf5",
help="File name of the annotated output dataset file.",
)
parser.add_argument("--auto", action="store_true", default=False, help="Automatically annotate subtasks.")
parser.add_argument(
"--signals",
type=str,
nargs="+",
default=[],
help="Sequence of subtask termination signals for all except last subtask",
)
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
# launch the simulator
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
"""Rest everything follows."""
import contextlib
import gymnasium as gym
import os
import torch
import omni.isaac.lab_mimic.envs
# Only enables inputs if this script is NOT headless mode
if not args_cli.headless and not os.environ.get("HEADLESS", 0):
from omni.isaac.lab.devices import Se3Keyboard
from omni.isaac.lab.envs.mdp.recorders.recorders_cfg import ActionStateRecorderManagerCfg
from omni.isaac.lab.managers import RecorderTerm, RecorderTermCfg
from omni.isaac.lab.utils import configclass
from omni.isaac.lab.utils.datasets import HDF5DatasetFileHandler
import omni.isaac.lab_tasks # noqa: F401
from omni.isaac.lab_tasks.utils.parse_cfg import parse_env_cfg
is_paused = False
current_action_index = 0
subtask_indices = []
def play_cb():
global is_paused
is_paused = False
def pause_cb():
global is_paused
is_paused = True
def mark_subtask_cb():
global current_action_index, subtask_indices
subtask_indices.append(current_action_index)
print(f"Marked subtask at action index: {current_action_index}")
class PreStepDatagenInfoRecorder(RecorderTerm):
"""Recorder term that records the datagen info data in each step."""
def record_pre_step(self):
datagen_info = {
"object_pose": self._env.scene.get_state(is_relative=True)["rigid_object"],
"target_eef_pose": self._env.action_to_target_eef_pos(self._env.action_manager.action),
}
return "obs", datagen_info
@configclass
class PreStepDatagenInfoRecorderCfg(RecorderTermCfg):
"""Configuration for the datagen info recorder term."""
class_type: type[RecorderTerm] = PreStepDatagenInfoRecorder
class PreStepSubtaskTermsObservationsRecorder(RecorderTerm):
"""Recorder term that records the subtask completion observations in each step."""
def record_pre_step(self):
return "obs/subtask_term_signals", self._env.obs_buf["subtask_terms"]
@configclass
class PreStepSubtaskTermsObservationsRecorderCfg(RecorderTermCfg):
"""Configuration for the step subtask terms observation recorder term."""
class_type: type[RecorderTerm] = PreStepSubtaskTermsObservationsRecorder
@configclass
class MimicRecorderManagerCfg(ActionStateRecorderManagerCfg):
"""Mimic specific recorder terms."""
record_pre_step_datagen_info = PreStepDatagenInfoRecorderCfg()
record_pre_step_subtask_term_signals = PreStepSubtaskTermsObservationsRecorderCfg()
def main():
"""Add Isaac Lab Mimic annotations to the given demo dataset file."""
global is_paused, current_action_index, subtask_indices
if not args_cli.auto and len(args_cli.signals) == 0:
if len(args_cli.signals) == 0:
raise ValueError("Subtask signals should be provided for manual mode.")
# Load input dataset to be annotated
if not os.path.exists(args_cli.input_file):
raise FileNotFoundError(f"The input dataset file {args_cli.input_file} does not exist.")
dataset_file_handler = HDF5DatasetFileHandler()
dataset_file_handler.open(args_cli.input_file)
env_name = dataset_file_handler.get_env_name()
episode_count = dataset_file_handler.get_num_episodes()
if episode_count == 0:
print("No episodes found in the dataset.")
exit()
# get output directory path and file name (without extension) from cli arguments
output_dir = os.path.dirname(args_cli.output_file)
output_file_name = os.path.splitext(os.path.basename(args_cli.output_file))[0]
# create output directory if it does not exist
if not os.path.exists(output_dir):
os.makedirs(output_dir)
if args_cli.task is not None:
env_name = args_cli.task
if env_name is None:
raise ValueError("Task/env name was not specified nor found in the dataset.")
env_cfg = parse_env_cfg(env_name, device=args_cli.device, num_envs=1)
# Disable all termination terms
env_cfg.terminations = {}
# Set up recorder terms for mimic annotations
env_cfg.env_name = args_cli.task
env_cfg.recorders: MimicRecorderManagerCfg = MimicRecorderManagerCfg()
env_cfg.recorders.dataset_export_dir_path = output_dir
env_cfg.recorders.dataset_filename = output_file_name
# create environment from loaded config
env = gym.make(args_cli.task, cfg=env_cfg)
# reset environment
env.reset()
# Only enables inputs if this script is NOT headless mode
if not args_cli.headless and not os.environ.get("HEADLESS", 0):
keyboard_interface = Se3Keyboard(pos_sensitivity=0.1, rot_sensitivity=0.1)
keyboard_interface.add_callback("N", play_cb)
keyboard_interface.add_callback("B", pause_cb)
if not args_cli.auto:
keyboard_interface.add_callback("S", mark_subtask_cb)
keyboard_interface.reset()
# simulate environment -- run everything in inference mode
with contextlib.suppress(KeyboardInterrupt) and torch.inference_mode():
while simulation_app.is_running() and not simulation_app.is_exiting():
# Iterate over the episodes in the loaded dataset file
for episode_index, episode_name in enumerate(dataset_file_handler.get_episode_names()):
subtask_indices = []
print(f"\nAnnotating episode #{episode_index} ({episode_name})")
episode = dataset_file_handler.load_episode(episode_name, env.unwrapped.device)
episode_data = episode.data
# read initial state from the loaded episode
initial_state = episode_data["initial_state"]
env.unwrapped.recorder_manager.reset()
env.unwrapped.reset_to(initial_state, None, is_relative=True)
# replay actions from this episode
actions = episode_data["actions"]
first_action = True
for action_index, action in enumerate(actions):
current_action_index = action_index
if first_action:
first_action = False
else:
while is_paused:
env.unwrapped.sim.render()
continue
action_tensor = torch.Tensor(action).reshape([1, action.shape[0]])
env.step(torch.Tensor(action_tensor))
if not args_cli.auto:
print(f"\tSubtasks marked at action indices: {subtask_indices}")
if len(args_cli.signals) != len(subtask_indices):
raise ValueError(
f"Number of annotated subtask signals {len(subtask_indices)} should be equal "
f" to number of subtasks {len(args_cli.signals)}"
)
annotated_episode = env.unwrapped.recorder_manager.get_episode(0)
del annotated_episode.data["obs"]["subtask_term_signals"]
for subtask_index in range(len(args_cli.signals)):
# subtask termination signal is false until subtask is complete, and true afterwards
subtask_signals = torch.ones(len(actions), dtype=torch.bool)
subtask_signals[: subtask_indices[subtask_index]] = False
annotated_episode.add(
f"obs/subtask_term_signals/{args_cli.signals[subtask_index]}", subtask_signals
)
# set success to the recorded episode data and export to file
env.unwrapped.recorder_manager.set_success_to_episodes(
None, torch.tensor([[True]], dtype=torch.bool, device=env.unwrapped.device)
)
env.unwrapped.recorder_manager.export_episodes()
print("\tExported annotated episode.")
break
# Close environment after annotation is complete
env.close()
if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
"""
Main data generation script.
"""
# Launching Isaac Sim Simulator first.
import argparse
from omni.isaac.lab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Record demonstrations for Isaac Lab environments.")
parser.add_argument("--task", type=str, default=None, help="Name of the task.")
parser.add_argument("--generation_num_trials", type=int, help="Number of demos to be generated.", default=None)
parser.add_argument(
"--num_envs", type=int, default=1, help="Number of environments to instantiate for generating datasets."
)
parser.add_argument("--input_file", type=str, default=None, required=True, help="File path to the source dataset file.")
parser.add_argument(
"--output_file",
type=str,
default="./datasets/output_dataset.hdf5",
help="File path to export recorded and generated episodes.",
)
parser.add_argument(
"--pause_subtask",
action="store_true",
help="pause after every subtask during generation for debugging - only useful with render flag",
)
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
# launch the simulator
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
"""Rest everything follows."""
import asyncio
import contextlib
import gymnasium as gym
import numpy as np
import os
import random
import torch
import omni.isaac.lab_mimic.envs
from omni.isaac.lab_mimic.datagen.data_generator import DataGenerator
from omni.isaac.lab_mimic.datagen.datagen_info_pool import DataGenInfoPool
from omni.isaac.lab.envs.mdp.recorders.recorders_cfg import ActionStateRecorderManagerCfg
from omni.isaac.lab.managers import DatasetExportMode
from omni.isaac.lab.utils.datasets import HDF5DatasetFileHandler
import omni.isaac.lab_tasks # noqa: F401
from omni.isaac.lab_tasks.utils.parse_cfg import parse_env_cfg
# global variable to keep track of the data generation statistics
num_success = 0
num_failures = 0
num_attempts = 0
async def run_data_generator(env, env_id, env_action_queue, data_generator, pause_subtask=False):
"""Run data generator."""
global num_success, num_failures, num_attempts
while True:
results = await data_generator.generate(
env_id=env_id,
env_action_queue=env_action_queue,
select_src_per_subtask=env.unwrapped.cfg.datagen_config.generation_select_src_per_subtask,
transform_first_robot_pose=env.unwrapped.cfg.datagen_config.generation_transform_first_robot_pose,
interpolate_from_last_target_pose=env.unwrapped.cfg.datagen_config.generation_interpolate_from_last_target_pose,
pause_subtask=pause_subtask,
)
if bool(results["success"]):
num_success += 1
else:
num_failures += 1
num_attempts += 1
def env_loop(env, env_action_queue, shared_datagen_info_pool, asyncio_event_loop):
"""Main loop for the environment."""
global num_success, num_failures, num_attempts
prev_num_attempts = 0
# simulate environment -- run everything in inference mode
with contextlib.suppress(KeyboardInterrupt) and torch.inference_mode():
while True:
actions = torch.zeros(env.unwrapped.action_space.shape)
# get actions from all the data generators
for i in range(env.unwrapped.num_envs):
# an async-blocking call to get an action from a data generator
env_id, action = asyncio_event_loop.run_until_complete(env_action_queue.get())
actions[env_id] = action
# perform action on environment
env.step(actions)
# mark done so the data generators can continue with the step results
for i in range(env.unwrapped.num_envs):
env_action_queue.task_done()
if prev_num_attempts != num_attempts:
prev_num_attempts = num_attempts
print("")
print("*" * 50)
print(f"have {num_success} successes out of {num_attempts} trials so far")
print(f"have {num_failures} failures out of {num_attempts} trials so far")
print("*" * 50)
# termination condition is on enough successes if @guarantee_success or enough attempts otherwise
generation_guarantee = env.unwrapped.cfg.datagen_config.generation_guarantee
generation_num_trials = env.unwrapped.cfg.datagen_config.generation_num_trials
check_val = num_success if generation_guarantee else num_attempts
if check_val >= generation_num_trials:
print(f"Reached {generation_num_trials} successes/attempts. Exiting.")
break
# check that simulation is stopped or not
if env.unwrapped.sim.is_stopped():
break
env.close()
def main():
num_envs = args_cli.num_envs
# get directory path and file name (without extension) from cli arguments
output_dir = os.path.dirname(args_cli.output_file)
output_file_name = os.path.splitext(os.path.basename(args_cli.output_file))[0]
# create directory if it does not exist
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Get env name from the dataset file
if not os.path.exists(args_cli.input_file):
raise FileNotFoundError(f"The dataset file {args_cli.input_file} does not exist.")
dataset_file_handler = HDF5DatasetFileHandler()
dataset_file_handler.open(args_cli.input_file)
env_name = dataset_file_handler.get_env_name()
if args_cli.task is not None:
env_name = args_cli.task
if env_name is None:
raise ValueError("Task/env name was not specified nor found in the dataset.")
# parse configuration
env_cfg = parse_env_cfg(env_name, device=args_cli.device, num_envs=num_envs)
# Override the datagen_config.generation_num_trials with the value from the command line arg
if args_cli.generation_num_trials is not None:
env_cfg.datagen_config.generation_num_trials = args_cli.generation_num_trials
env_cfg.env_name = env_name
# modify configuration such that the environment runs indefinitely
env_cfg.terminations.time_out = None
# data generator is in charge of resetting the environment
env_cfg.terminations.success = None
env_cfg.observations.policy.concatenate_terms = False
env_cfg.recorders = ActionStateRecorderManagerCfg()
env_cfg.recorders.dataset_export_dir_path = output_dir
env_cfg.recorders.dataset_filename = output_file_name
if env_cfg.datagen_config.generation_keep_failed:
env_cfg.recorders.dataset_export_mode = DatasetExportMode.EXPORT_SUCCEEDED_FAILED_IN_SEPARATE_FILES
else:
env_cfg.recorders.dataset_export_mode = DatasetExportMode.EXPORT_SUCCEEDED_ONLY
# create environment
env = gym.make(env_name, cfg=env_cfg)
# set seed for generation
random.seed(env.unwrapped.cfg.datagen_config.seed)
np.random.seed(env.unwrapped.cfg.datagen_config.seed)
torch.manual_seed(env.unwrapped.cfg.datagen_config.seed)
# reset before starting
env.reset()
# Set up asyncio stuff
asyncio_event_loop = asyncio.get_event_loop()
env_action_queue = asyncio.Queue()
shared_datagen_info_pool_lock = asyncio.Lock()
shared_datagen_info_pool = DataGenInfoPool(
env.unwrapped, env.unwrapped.cfg, env.unwrapped.device, asyncio_lock=shared_datagen_info_pool_lock
)
shared_datagen_info_pool.load_from_dataset_file(args_cli.input_file)
print(f"Loaded {shared_datagen_info_pool.num_datagen_infos} to datagen info pool")
# make data generator object
data_generator = DataGenerator(env=env.unwrapped, src_demo_datagen_info_pool=shared_datagen_info_pool)
data_generator_asyncio_tasks = []
for i in range(num_envs):
data_generator_asyncio_tasks.append(
asyncio_event_loop.create_task(
run_data_generator(env, i, env_action_queue, data_generator, pause_subtask=args_cli.pause_subtask)
)
)
try:
asyncio.ensure_future(asyncio.gather(*data_generator_asyncio_tasks))
except asyncio.CancelledError:
print("Tasks were cancelled.")
env_loop(env, env_action_queue, shared_datagen_info_pool, asyncio_event_loop)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nProgram interrupted by user. Exiting...")
# close sim app
simulation_app.close()
......@@ -15,6 +15,7 @@ PER_TEST_TIMEOUTS = {
"test_env_rendering_logic.py": 300,
"test_camera.py": 500,
"test_tiled_camera.py": 300,
"test_generate_dataset.py": 300, # This test runs annotation for 10 demos and generation until one succeeds
"test_rsl_rl_wrapper.py": 200,
"test_sb3_wrapper.py": 200,
"test_skrl_wrapper.py": 200,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment