Unverified Commit 27cec08c authored by chengronglai's avatar chengronglai Committed by GitHub

Adds blueprint environment for Franka stacking mimic (#1944)

# Description

This pull request refactors the core functionality of
generate_dataset.py into separate helper modules. The goal is to make
the associated Jupyter notebook leaner.

* Move functions in generate_dataset to
isaaclab_mimic/datagen/generation.py
* Add utils.py for utility function for the notebook
* Add dependency for ipywidgets

## Type of change

- New feature (non-breaking change which adds functionality)

## Checklist

- [ x] I have run the [`pre-commit` checks](https://pre-commit.com/)
with `./isaaclab.sh --format`
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [ ] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there

<!--
As you go through the checklist above, you can mark something as done by
putting an x character in it

For example,
- [x] I have done this task
- [ ] I have not done this task
-->

---------
Signed-off-by: 's avatarKelly Guo <kellyguo123@hotmail.com>
Co-authored-by: 's avatarJean-Francois-Lafleche <57650687+Jean-Francois-Lafleche@users.noreply.github.com>
Co-authored-by: 's avatarPeter Du <peterd@nvidia.com>
Co-authored-by: 's avatarKelly Guo <kellyguo123@hotmail.com>
Co-authored-by: 's avatarKelly Guo <kellyg@nvidia.com>
parent c8a7e47a
......@@ -129,6 +129,7 @@ exclude_patterns = ["_build", "_redirect", "_templates", "Thumbs.db", ".DS_Store
# Mock out modules that are not available on RTD
autodoc_mock_imports = [
"torch",
"torchvision",
"numpy",
"matplotlib",
"scipy",
......
......@@ -44,149 +44,34 @@ simulation_app = app_launcher.app
"""Rest everything follows."""
import asyncio
import contextlib
import gymnasium as gym
import numpy as np
import os
import random
import torch
from isaaclab.envs.mdp.recorders.recorders_cfg import ActionStateRecorderManagerCfg
from isaaclab.managers import DatasetExportMode
from isaaclab.utils.datasets import HDF5DatasetFileHandler
import isaaclab_mimic.envs # noqa: F401
from isaaclab_mimic.datagen.data_generator import DataGenerator
from isaaclab_mimic.datagen.datagen_info_pool import DataGenInfoPool
from isaaclab_mimic.datagen.generation import env_loop, setup_async_generation, setup_env_config
from isaaclab_mimic.datagen.utils import get_env_name_from_dataset, setup_output_paths
import isaaclab_tasks # noqa: F401
from isaaclab_tasks.utils.parse_cfg import parse_env_cfg
# global variable to keep track of the data generation statistics
num_success = 0
num_failures = 0
num_attempts = 0
async def run_data_generator(env, env_id, env_action_queue, data_generator, success_term, pause_subtask=False):
"""Run data generator."""
global num_success, num_failures, num_attempts
while True:
results = await data_generator.generate(
env_id=env_id,
success_term=success_term,
env_action_queue=env_action_queue,
select_src_per_subtask=env.unwrapped.cfg.datagen_config.generation_select_src_per_subtask,
transform_first_robot_pose=env.unwrapped.cfg.datagen_config.generation_transform_first_robot_pose,
interpolate_from_last_target_pose=env.unwrapped.cfg.datagen_config.generation_interpolate_from_last_target_pose,
pause_subtask=pause_subtask,
)
if bool(results["success"]):
num_success += 1
else:
num_failures += 1
num_attempts += 1
def env_loop(env, env_action_queue, shared_datagen_info_pool, asyncio_event_loop):
"""Main loop for the environment."""
global num_success, num_failures, num_attempts
prev_num_attempts = 0
# simulate environment -- run everything in inference mode
with contextlib.suppress(KeyboardInterrupt) and torch.inference_mode():
while True:
actions = torch.zeros(env.unwrapped.action_space.shape)
# get actions from all the data generators
for i in range(env.unwrapped.num_envs):
# an async-blocking call to get an action from a data generator
env_id, action = asyncio_event_loop.run_until_complete(env_action_queue.get())
actions[env_id] = action
# perform action on environment
env.step(actions)
# mark done so the data generators can continue with the step results
for i in range(env.unwrapped.num_envs):
env_action_queue.task_done()
if prev_num_attempts != num_attempts:
prev_num_attempts = num_attempts
print("")
print("*" * 50)
print(f"have {num_success} successes out of {num_attempts} trials so far")
print(f"have {num_failures} failures out of {num_attempts} trials so far")
print("*" * 50)
# termination condition is on enough successes if @guarantee_success or enough attempts otherwise
generation_guarantee = env.unwrapped.cfg.datagen_config.generation_guarantee
generation_num_trials = env.unwrapped.cfg.datagen_config.generation_num_trials
check_val = num_success if generation_guarantee else num_attempts
if check_val >= generation_num_trials:
print(f"Reached {generation_num_trials} successes/attempts. Exiting.")
break
# check that simulation is stopped or not
if env.unwrapped.sim.is_stopped():
break
env.close()
def main():
num_envs = args_cli.num_envs
# get directory path and file name (without extension) from cli arguments
output_dir = os.path.dirname(args_cli.output_file)
output_file_name = os.path.splitext(os.path.basename(args_cli.output_file))[0]
# create directory if it does not exist
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Get env name from the dataset file
if not os.path.exists(args_cli.input_file):
raise FileNotFoundError(f"The dataset file {args_cli.input_file} does not exist.")
dataset_file_handler = HDF5DatasetFileHandler()
dataset_file_handler.open(args_cli.input_file)
env_name = dataset_file_handler.get_env_name()
if args_cli.task is not None:
env_name = args_cli.task
if env_name is None:
raise ValueError("Task/env name was not specified nor found in the dataset.")
# parse configuration
env_cfg = parse_env_cfg(env_name, device=args_cli.device, num_envs=num_envs)
# Override the datagen_config.generation_num_trials with the value from the command line arg
if args_cli.generation_num_trials is not None:
env_cfg.datagen_config.generation_num_trials = args_cli.generation_num_trials
env_cfg.env_name = env_name
# extract success checking function to invoke manually
success_term = None
if hasattr(env_cfg.terminations, "success"):
success_term = env_cfg.terminations.success
env_cfg.terminations.success = None
else:
raise NotImplementedError("No success termination term was found in the environment.")
# data generator is in charge of resetting the environment
env_cfg.terminations = None
env_cfg.observations.policy.concatenate_terms = False
env_cfg.recorders = ActionStateRecorderManagerCfg()
env_cfg.recorders.dataset_export_dir_path = output_dir
env_cfg.recorders.dataset_filename = output_file_name
if env_cfg.datagen_config.generation_keep_failed:
env_cfg.recorders.dataset_export_mode = DatasetExportMode.EXPORT_SUCCEEDED_FAILED_IN_SEPARATE_FILES
else:
env_cfg.recorders.dataset_export_mode = DatasetExportMode.EXPORT_SUCCEEDED_ONLY
# Setup output paths and get env name
output_dir, output_file_name = setup_output_paths(args_cli.output_file)
env_name = args_cli.task or get_env_name_from_dataset(args_cli.input_file)
# Configure environment
env_cfg, success_term = setup_env_config(
env_name=env_name,
output_dir=output_dir,
output_file_name=output_file_name,
num_envs=num_envs,
device=args_cli.device,
generation_num_trials=args_cli.generation_num_trials,
)
# create environment
env = gym.make(env_name, cfg=env_cfg)
......@@ -199,36 +84,21 @@ def main():
# reset before starting
env.reset()
# Set up asyncio stuff
asyncio_event_loop = asyncio.get_event_loop()
env_action_queue = asyncio.Queue()
shared_datagen_info_pool_lock = asyncio.Lock()
shared_datagen_info_pool = DataGenInfoPool(
env.unwrapped, env.unwrapped.cfg, env.unwrapped.device, asyncio_lock=shared_datagen_info_pool_lock
# Setup and run async data generation
async_components = setup_async_generation(
env=env,
num_envs=args_cli.num_envs,
input_file=args_cli.input_file,
success_term=success_term,
pause_subtask=args_cli.pause_subtask,
)
shared_datagen_info_pool.load_from_dataset_file(args_cli.input_file)
print(f"Loaded {shared_datagen_info_pool.num_datagen_infos} to datagen info pool")
# make data generator object
data_generator = DataGenerator(env=env.unwrapped, src_demo_datagen_info_pool=shared_datagen_info_pool)
data_generator_asyncio_tasks = []
for i in range(num_envs):
data_generator_asyncio_tasks.append(
asyncio_event_loop.create_task(
run_data_generator(
env, i, env_action_queue, data_generator, success_term, pause_subtask=args_cli.pause_subtask
)
)
)
try:
asyncio.ensure_future(asyncio.gather(*data_generator_asyncio_tasks))
asyncio.ensure_future(asyncio.gather(*async_components["tasks"]))
env_loop(env, async_components["action_queue"], async_components["info_pool"], async_components["event_loop"])
except asyncio.CancelledError:
print("Tasks were cancelled.")
env_loop(env, env_action_queue, shared_datagen_info_pool, asyncio_event_loop)
if __name__ == "__main__":
try:
......
[package]
# Note: Semantic Versioning is used: https://semver.org/
version = "0.36.0"
version = "0.36.1"
# Description
title = "Isaac Lab framework for Robot Learning"
......
Changelog
---------
0.36.1 (2025-03-10)
~~~~~~~~~~~~~~~~~~~
Added
^^^^^
* Added :attr:`semantic_segmentation_mapping` for camera configs to allow specifying colors for semantics.
0.36.0 (2025-03-07)
~~~~~~~~~~~~~~~~~~~
......
......@@ -72,14 +72,15 @@ class ManagerBasedRLEnv(ManagerBasedEnv, gym.Env):
render_mode: The render mode for the environment. Defaults to None, which
is similar to ``"human"``.
"""
# -- counter for curriculum
self.common_step_counter = 0
# initialize the base class to setup the scene.
super().__init__(cfg=cfg)
# store the render mode
self.render_mode = render_mode
# initialize data and constants
# -- counter for curriculum
self.common_step_counter = 0
# -- init buffers
self.episode_length_buf = torch.zeros(self.num_envs, device=self.device, dtype=torch.long)
# -- set the framerate of the gym video recorder wrapper so that the playback speed of the produced video matches the simulation
......
......@@ -5,6 +5,7 @@
from __future__ import annotations
import json
import numpy as np
import re
import torch
......@@ -455,7 +456,10 @@ class Camera(SensorBase):
# if colorize is true, the data is mapped to colors and a uint8 4 channel image is returned.
# if colorize is false, the data is returned as a uint32 image with ids as values.
if name == "semantic_segmentation":
init_params = {"colorize": self.cfg.colorize_semantic_segmentation}
init_params = {
"colorize": self.cfg.colorize_semantic_segmentation,
"mapping": json.dumps(self.cfg.semantic_segmentation_mapping),
}
elif name == "instance_segmentation_fast":
init_params = {"colorize": self.cfg.colorize_instance_segmentation}
elif name == "instance_id_segmentation_fast":
......
......@@ -114,3 +114,19 @@ class CameraCfg(SensorBaseCfg):
If True, instance segmentation is converted to an image where instance IDs are mapped to colors.
and returned as a ``uint8`` 4-channel array. If False, the output is returned as a ``int32`` array.
"""
semantic_segmentation_mapping: dict = {}
"""Dictionary mapping semantics to specific colours
Eg.
```
{
"class:cube_1": (255, 36, 66, 255),
"class:cube_2": (255, 184, 48, 255),
"class:cube_3": (55, 255, 139, 255),
"class:table": (255, 237, 218, 255),
"class:ground": (100, 100, 100, 255),
"class:robot": (61, 178, 255, 255),
}
```
"""
......@@ -5,6 +5,7 @@
from __future__ import annotations
import json
import math
import numpy as np
import torch
......@@ -224,7 +225,10 @@ class TiledCamera(Camera):
else:
init_params = None
if annotator_type == "semantic_segmentation":
init_params = {"colorize": self.cfg.colorize_semantic_segmentation}
init_params = {
"colorize": self.cfg.colorize_semantic_segmentation,
"mapping": json.dumps(self.cfg.semantic_segmentation_mapping),
}
elif annotator_type == "instance_segmentation_fast":
init_params = {"colorize": self.cfg.colorize_instance_segmentation}
elif annotator_type == "instance_id_segmentation_fast":
......
[package]
# Semantic Versioning is used: https://semver.org/
version = "1.0.2"
version = "1.0.3"
# Description
category = "isaaclab"
......
Changelog
---------
1.0.3 (2025-03-10)
~~~~~~~~~~~~~~~~~~
Changed
^^^^^^^
* Refactored dataset generation code into leaner modules to prepare for Jupyter notebook.
Added
^^^^^
* Added ``Isaac-Stack-Cube-Franka-IK-Rel-Blueprint-Mimic-v0`` environment for blueprint vision stacking.
1.0.2 (2025-01-10)
~~~~~~~~~~~~~~~~~~
Fixed
^^^^^^^
^^^^^
* Fixed test_selection_strategy.py test case by starting omniverse app to import needed dependencies.
......
......@@ -8,5 +8,7 @@
from .data_generator import *
from .datagen_info import *
from .datagen_info_pool import *
from .generation import *
from .selection_strategy import *
from .utils import *
from .waypoint import *
# Copyright (c) 2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
import asyncio
import contextlib
import torch
from typing import Any
from isaaclab.envs import ManagerBasedEnv
from isaaclab.envs.mdp.recorders.recorders_cfg import ActionStateRecorderManagerCfg
from isaaclab.managers import DatasetExportMode
from isaaclab_mimic.datagen.data_generator import DataGenerator
from isaaclab_mimic.datagen.datagen_info_pool import DataGenInfoPool
from isaaclab_tasks.utils.parse_cfg import parse_env_cfg
# global variable to keep track of the data generation statistics
num_success = 0
num_failures = 0
num_attempts = 0
async def run_data_generator(
env: ManagerBasedEnv,
env_id: int,
env_action_queue: asyncio.Queue,
data_generator: DataGenerator,
success_term: Any,
pause_subtask: bool = False,
):
"""Run data generator."""
global num_success, num_failures, num_attempts
while True:
results = await data_generator.generate(
env_id=env_id,
success_term=success_term,
env_action_queue=env_action_queue,
select_src_per_subtask=env.unwrapped.cfg.datagen_config.generation_select_src_per_subtask,
transform_first_robot_pose=env.unwrapped.cfg.datagen_config.generation_transform_first_robot_pose,
interpolate_from_last_target_pose=env.unwrapped.cfg.datagen_config.generation_interpolate_from_last_target_pose,
pause_subtask=pause_subtask,
)
if bool(results["success"]):
num_success += 1
else:
num_failures += 1
num_attempts += 1
def env_loop(
env: ManagerBasedEnv,
env_action_queue: asyncio.Queue,
shared_datagen_info_pool: DataGenInfoPool,
asyncio_event_loop: asyncio.AbstractEventLoop,
) -> None:
"""Main loop for the environment."""
global num_success, num_failures, num_attempts
prev_num_attempts = 0
# simulate environment -- run everything in inference mode
with contextlib.suppress(KeyboardInterrupt) and torch.inference_mode():
while True:
actions = torch.zeros(env.unwrapped.action_space.shape)
# get actions from all the data generators
for i in range(env.unwrapped.num_envs):
# an async-blocking call to get an action from a data generator
env_id, action = asyncio_event_loop.run_until_complete(env_action_queue.get())
actions[env_id] = action
# perform action on environment
env.step(actions)
# mark done so the data generators can continue with the step results
for i in range(env.unwrapped.num_envs):
env_action_queue.task_done()
if prev_num_attempts != num_attempts:
prev_num_attempts = num_attempts
print("")
print("*" * 50)
print(f"have {num_success} successes out of {num_attempts} trials so far")
print(f"have {num_failures} failures out of {num_attempts} trials so far")
print("*" * 50)
# termination condition is on enough successes if @guarantee_success or enough attempts otherwise
generation_guarantee = env.unwrapped.cfg.datagen_config.generation_guarantee
generation_num_trials = env.unwrapped.cfg.datagen_config.generation_num_trials
check_val = num_success if generation_guarantee else num_attempts
if check_val >= generation_num_trials:
print(f"Reached {generation_num_trials} successes/attempts. Exiting.")
break
# check that simulation is stopped or not
if env.unwrapped.sim.is_stopped():
break
env.close()
def setup_env_config(
env_name: str,
output_dir: str,
output_file_name: str,
num_envs: int,
device: str,
generation_num_trials: int | None = None,
) -> tuple[Any, Any]:
"""Configure the environment for data generation.
Args:
env_name: Name of the environment
output_dir: Directory to save output
output_file_name: Name of output file
num_envs: Number of environments to run
device: Device to run on
generation_num_trials: Optional override for number of trials
Returns:
tuple containing:
- env_cfg: The environment configuration
- success_term: The success termination condition
Raises:
NotImplementedError: If no success termination term found
"""
env_cfg = parse_env_cfg(env_name, device=device, num_envs=num_envs)
if generation_num_trials is not None:
env_cfg.datagen_config.generation_num_trials = generation_num_trials
env_cfg.env_name = env_name
# Extract success checking function
success_term = None
if hasattr(env_cfg.terminations, "success"):
success_term = env_cfg.terminations.success
env_cfg.terminations.success = None
else:
raise NotImplementedError("No success termination term was found in the environment.")
# Configure for data generation
env_cfg.terminations = None
env_cfg.observations.policy.concatenate_terms = False
# Setup recorders
env_cfg.recorders = ActionStateRecorderManagerCfg()
env_cfg.recorders.dataset_export_dir_path = output_dir
env_cfg.recorders.dataset_filename = output_file_name
if env_cfg.datagen_config.generation_keep_failed:
env_cfg.recorders.dataset_export_mode = DatasetExportMode.EXPORT_SUCCEEDED_FAILED_IN_SEPARATE_FILES
else:
env_cfg.recorders.dataset_export_mode = DatasetExportMode.EXPORT_SUCCEEDED_ONLY
return env_cfg, success_term
def setup_async_generation(
env: Any, num_envs: int, input_file: str, success_term: Any, pause_subtask: bool = False
) -> dict[str, Any]:
"""Setup async data generation tasks.
Args:
env: The environment instance
num_envs: Number of environments to run
input_file: Path to input dataset file
success_term: Success termination condition
pause_subtask: Whether to pause after subtasks
Returns:
List of asyncio tasks for data generation
"""
asyncio_event_loop = asyncio.get_event_loop()
env_action_queue = asyncio.Queue()
shared_datagen_info_pool_lock = asyncio.Lock()
shared_datagen_info_pool = DataGenInfoPool(
env.unwrapped, env.unwrapped.cfg, env.unwrapped.device, asyncio_lock=shared_datagen_info_pool_lock
)
shared_datagen_info_pool.load_from_dataset_file(input_file)
print(f"Loaded {shared_datagen_info_pool.num_datagen_infos} to datagen info pool")
# Create and schedule data generator tasks
data_generator = DataGenerator(env=env.unwrapped, src_demo_datagen_info_pool=shared_datagen_info_pool)
data_generator_asyncio_tasks = []
for i in range(num_envs):
task = asyncio_event_loop.create_task(
run_data_generator(env, i, env_action_queue, data_generator, success_term, pause_subtask=pause_subtask)
)
data_generator_asyncio_tasks.append(task)
return {
"tasks": data_generator_asyncio_tasks,
"event_loop": asyncio_event_loop,
"action_queue": env_action_queue,
"info_pool": shared_datagen_info_pool,
}
# Copyright (c) 2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
import os
from collections.abc import Callable
from typing import Any
from IPython.display import display
from ipywidgets import widgets
from isaaclab.envs import ManagerBasedEnv
from isaaclab.managers import EventTermCfg
from isaaclab.utils.datasets import HDF5DatasetFileHandler
def get_nested_value(d: dict[str, Any], keys: list[str]) -> Any:
"""Retrieve a nested value from dictionary d using list of keys."""
for k in keys:
d = d[k]
return d
def update_nested_value(d: dict[str, Any], keys: list[str], value: Any) -> None:
"""Update a nested value in dictionary d using list of keys."""
for k in keys[:-1]:
d = d.setdefault(k, {})
d[keys[-1]] = value
def reset_env(env: ManagerBasedEnv, steps: int = 1) -> None:
"""Reset environment and step simulation to stabilize state."""
# Get sim and scene from unwrapped environment
sim = env.unwrapped.sim
scene = env.unwrapped.scene
# Reset environment
env.reset()
# Step simulation multiple times to stabilize
for _ in range(steps):
# Write data to sim
scene.write_data_to_sim()
# Perform step
sim.step()
# Update buffers
scene.update(dt=env.physics_dt)
def get_parameter_input(
param_name: str,
current_val: float | tuple[float, float] | list[float],
allowed_range: tuple[float, float, float | None],
update_fn: Callable[[float | tuple[float, float]], None],
env: ManagerBasedEnv | None = None,
event_term_name: str | None = None,
) -> widgets.FloatSlider | widgets.FloatRangeSlider:
"""Get parameter input using ipywidgets with immediate value updates."""
if isinstance(current_val, (tuple, list)):
step_size = allowed_range[2] if len(allowed_range) > 2 else 0.01
full_param_name = f"{event_term_name}.{param_name}" if event_term_name else param_name
# Create container with label and range slider
container = widgets.HBox([
widgets.Label(full_param_name, layout=widgets.Layout(width="auto")),
widgets.FloatRangeSlider(
value=[current_val[0], current_val[1]],
min=allowed_range[0],
max=allowed_range[1],
step=step_size,
layout=widgets.Layout(width="300px"),
readout=True,
readout_format=".3f",
),
])
def on_value_change(change):
new_tuple = (change["new"][0], change["new"][1])
update_fn(new_tuple)
if env is not None:
reset_env(env, steps=50)
container.children[1].observe(on_value_change, names="value")
# Create help text showing the allowed range
help_text = widgets.HTML(value=f'<p style="color:gray">Allowed range: {allowed_range[:2]}</p>')
display(container)
display(help_text)
return container.children[1]
else:
step_size = allowed_range[2] if len(allowed_range) > 2 else 0.01
full_param_name = f"{event_term_name}.{param_name}" if event_term_name else param_name
# Create container with label and slider
container = widgets.HBox([
widgets.Label(full_param_name, layout=widgets.Layout(width="auto")),
widgets.FloatSlider(
value=current_val,
min=allowed_range[0],
max=allowed_range[1],
step=step_size,
layout=widgets.Layout(width="300px"),
readout=True,
readout_format=".3f",
),
])
def on_value_change(change):
update_fn(change["new"])
if env is not None:
reset_env(env, steps=50)
container.children[1].observe(on_value_change, names="value")
# Create help text showing the allowed range
help_text = widgets.HTML(value=f'<p style="color:gray">Allowed range: {allowed_range[:2]}</p>')
display(container)
display(help_text)
return container.children[1]
def interactive_update_randomizable_params(
event_term: EventTermCfg,
event_term_name: str,
param_config: dict[str, dict | tuple[float, float, float | None]],
param_path: str = "",
env: ManagerBasedEnv | None = None,
) -> list[tuple[list[str], widgets.FloatSlider | widgets.FloatRangeSlider]]:
"""Interactive parameter updates using ipywidgets."""
inputs = []
for key, allowed_range in param_config.items():
current_path = f"{param_path}.{key}" if param_path else key
keys = current_path.split(".")
if isinstance(allowed_range, dict):
interactive_update_randomizable_params(event_term, event_term_name, allowed_range, current_path, env)
else:
try:
current_val = get_nested_value(event_term.params, keys)
def make_update_fn(k, full_path):
def update_fn(new_val):
update_nested_value(event_term.params, k, new_val)
print(f"Updated '{full_path}' to {new_val}.")
return update_fn
input_widget = get_parameter_input(
current_path,
current_val,
allowed_range,
make_update_fn(keys, current_path),
env=env,
event_term_name=event_term_name,
)
inputs.append((keys, input_widget))
except KeyError:
print(f"Key '{current_path}' not found in event_term.params; skipping.")
continue
return inputs
def setup_output_paths(output_file_path: str) -> tuple[str, str]:
"""Set up output directory and get file name for dataset generation.
Args:
output_file_path: Full path to the desired output file
Returns:
tuple containing:
- output_dir: Path to the output directory
- output_file_name: Name of the output file without extension
"""
output_dir = os.path.dirname(output_file_path)
output_file_name = os.path.splitext(os.path.basename(output_file_path))[0]
# create directory if it does not exist
if not os.path.exists(output_dir):
os.makedirs(output_dir)
return output_dir, output_file_name
def get_env_name_from_dataset(input_file_path: str) -> str:
"""Get environment name from an input dataset file.
Args:
input_file_path: Path to the input dataset file
Returns:
env_name: Name of the environment from the dataset
Raises:
FileNotFoundError: If the input file does not exist
"""
if not os.path.exists(input_file_path):
raise FileNotFoundError(f"The dataset file {input_file_path} does not exist.")
dataset_file_handler = HDF5DatasetFileHandler()
dataset_file_handler.open(input_file_path)
env_name = dataset_file_handler.get_env_name()
if env_name is None:
raise ValueError("Environment name not found in dataset")
return env_name
......@@ -7,6 +7,7 @@
import gymnasium as gym
from .franka_stack_ik_rel_blueprint_mimic_env_cfg import FrankaCubeStackIKRelBlueprintMimicEnvCfg
from .franka_stack_ik_rel_mimic_env import FrankaCubeStackIKRelMimicEnv
from .franka_stack_ik_rel_mimic_env_cfg import FrankaCubeStackIKRelMimicEnvCfg
......@@ -22,3 +23,12 @@ gym.register(
},
disable_env_checker=True,
)
gym.register(
id="Isaac-Stack-Cube-Franka-IK-Rel-Blueprint-Mimic-v0",
entry_point="isaaclab_mimic.envs:FrankaCubeStackIKRelMimicEnv",
kwargs={
"env_cfg_entry_point": franka_stack_ik_rel_blueprint_mimic_env_cfg.FrankaCubeStackIKRelBlueprintMimicEnvCfg,
},
disable_env_checker=True,
)
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
from isaaclab.envs.mimic_env_cfg import MimicEnvCfg, SubTaskConfig
from isaaclab.utils import configclass
from isaaclab_tasks.manager_based.manipulation.stack.config.franka.stack_ik_rel_blueprint_env_cfg import (
FrankaCubeStackBlueprintEnvCfg,
)
@configclass
class FrankaCubeStackIKRelBlueprintMimicEnvCfg(FrankaCubeStackBlueprintEnvCfg, MimicEnvCfg):
"""
Isaac Lab Mimic environment config class for Franka Cube Stack IK Rel env.
"""
def __post_init__(self):
# post init of parents
super().__post_init__()
# Override the existing values
self.datagen_config.name = "isaac_lab_franka_stack_ik_rel_blueprint_D0"
self.datagen_config.generation_guarantee = True
self.datagen_config.generation_keep_failed = True
self.datagen_config.generation_num_trials = 10
self.datagen_config.generation_select_src_per_subtask = True
self.datagen_config.generation_transform_first_robot_pose = False
self.datagen_config.generation_interpolate_from_last_target_pose = True
self.datagen_config.max_num_failures = 25
self.datagen_config.seed = 1
# The following are the subtask configurations for the stack task.
subtask_configs = []
subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_2",
# This key corresponds to the binary indicator in "datagen_info" that signals
# when this subtask is finished (e.g., on a 0 to 1 edge).
subtask_term_signal="grasp_1",
# Specifies time offsets for data generation when splitting a trajectory into
# subtask segments. Random offsets are added to the termination boundary.
subtask_term_offset_range=(10, 20),
# Selection strategy for the source subtask segment during data generation
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.03,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=5,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_1",
# Corresponding key for the binary indicator in "datagen_info" for completion
subtask_term_signal="stack_1",
# Time offsets for data generation when splitting a trajectory
subtask_term_offset_range=(10, 20),
# Selection strategy for source subtask segment
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.03,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=5,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_3",
# Corresponding key for the binary indicator in "datagen_info" for completion
subtask_term_signal="grasp_2",
# Time offsets for data generation when splitting a trajectory
subtask_term_offset_range=(10, 20),
# Selection strategy for source subtask segment
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.03,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=5,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_2",
# End of final subtask does not need to be detected
subtask_term_signal=None,
# No time offsets for the final subtask
subtask_term_offset_range=(0, 0),
# Selection strategy for source subtask segment
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.03,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=5,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
self.subtask_configs["franka"] = subtask_configs
......@@ -20,6 +20,8 @@ EXTENSION_TOML_DATA = toml.load(os.path.join(EXTENSION_PATH, "config", "extensio
# Minimum dependencies required prior to installation
INSTALL_REQUIRES = [
"tomli",
# jupyter notebook
"ipywidgets==8.1.5",
]
# Extra dependencies for IL agents
......
[package]
# Note: Semantic Versioning is used: https://semver.org/
version = "0.10.24"
version = "0.10.25"
# Description
title = "Isaac Lab Environments"
......
Changelog
---------
0.10.25 (2025-03-10)
~~~~~~~~~~~~~~~~~~~~
Added
^^^^^
* Added ``Isaac-Stack-Cube-Franka-IK-Rel-Blueprint-v0`` stacking environment with camera inputs.
0.10.24 (2025-02-13)
~~~~~~~~~~~~~~~~~~~~
......
......@@ -7,6 +7,7 @@ import os
from . import (
agents,
stack_ik_rel_blueprint_env_cfg,
stack_ik_rel_env_cfg,
stack_ik_rel_instance_randomize_env_cfg,
stack_joint_pos_env_cfg,
......@@ -62,3 +63,12 @@ gym.register(
},
disable_env_checker=True,
)
gym.register(
id="Isaac-Stack-Cube-Franka-IK-Rel-Blueprint-v0",
entry_point="isaaclab.envs:ManagerBasedRLEnv",
kwargs={
"env_cfg_entry_point": stack_ik_rel_blueprint_env_cfg.FrankaCubeStackBlueprintEnvCfg,
},
disable_env_checker=True,
)
......@@ -68,6 +68,13 @@ class FrankaCubeStackEnvCfg(StackEnvCfg):
# Set Franka as robot
self.scene.robot = FRANKA_PANDA_CFG.replace(prim_path="{ENV_REGEX_NS}/Robot")
self.scene.robot.spawn.semantic_tags = [("class", "robot")]
# Add semantics to table
self.scene.table.spawn.semantic_tags = [("class", "table")]
# Add semantics to ground
self.scene.plane.semantic_tags = [("class", "ground")]
# Set actions for the specific robot type (franka)
self.actions.arm_action = mdp.JointPositionActionCfg(
......@@ -98,6 +105,7 @@ class FrankaCubeStackEnvCfg(StackEnvCfg):
usd_path=f"{ISAAC_NUCLEUS_DIR}/Props/Blocks/blue_block.usd",
scale=(1.0, 1.0, 1.0),
rigid_props=cube_properties,
semantic_tags=[("class", "cube_1")],
),
)
self.scene.cube_2 = RigidObjectCfg(
......@@ -107,6 +115,7 @@ class FrankaCubeStackEnvCfg(StackEnvCfg):
usd_path=f"{ISAAC_NUCLEUS_DIR}/Props/Blocks/red_block.usd",
scale=(1.0, 1.0, 1.0),
rigid_props=cube_properties,
semantic_tags=[("class", "cube_2")],
),
)
self.scene.cube_3 = RigidObjectCfg(
......@@ -116,6 +125,7 @@ class FrankaCubeStackEnvCfg(StackEnvCfg):
usd_path=f"{ISAAC_NUCLEUS_DIR}/Props/Blocks/green_block.usd",
scale=(1.0, 1.0, 1.0),
rigid_props=cube_properties,
semantic_tags=[("class", "cube_3")],
),
)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment