Commit d71f9b7b authored by shauryadNv's avatar shauryadNv Committed by Kelly Guo

Adds stack environment, scripts for Cosmos, and visual robustness evaluation (#395)

<!--
Thank you for your interest in sending a pull request. Please make sure
to check the contribution guidelines.

Link:
https://isaac-sim.github.io/IsaacLab/main/source/refs/contributing.html
-->

Changes:
1. Adds a new Franka cube stacking visuomotor environment as per Cosmos
requirements: higher resolution and multi-modality support.
2. Adds scripts for data pre-processing and post-processing before and
after Cosmos augmentation respectively.
3. Adds evaluation of trained visuomotor policies for robustness to
visual changes using domain randomization.
4. Makes task termination checks more strict for the Franka cube
stacking task.
5. Adds new documentation for the Cosmos imitation learning pipeline.

<!-- As a practice, it is recommended to open an issue to have
discussions on the proposed pull request.
This makes it easier for the community to keep track of what is being
developed or added, and if a given feature
is demanded by more than one party. -->

<!-- As you go through the list, delete the ones that are not
applicable. -->

- New feature (non-breaking change which adds functionality)
- This change requires a documentation update

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./isaaclab.sh --format`
- [x] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [x] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there

<!--
As you go through the checklist above, you can mark something as done by
putting an x character in it

For example,
- [x] I have done this task
- [ ] I have not done this task
-->

---------
Signed-off-by: 's avatarrwiltz <165190220+rwiltz@users.noreply.github.com>
Signed-off-by: 's avatarKelly Guo <kellyguo123@hotmail.com>
Signed-off-by: 's avatarAshwin Varghese Kuruttukulam <123109010+ashwinvkNV@users.noreply.github.com>
Signed-off-by: 's avatarKelly Guo <kellyg@nvidia.com>
Signed-off-by: 's avatarMichael Gussert <michael@gussert.com>
Signed-off-by: 's avatarsamibouziri <79418773+samibouziri@users.noreply.github.com>
Signed-off-by: 's avatarMayank Mittal <12863862+Mayankm96@users.noreply.github.com>
Signed-off-by: 's avatarKyle Morgenstein <34984693+KyleM73@users.noreply.github.com>
Signed-off-by: 's avatarHongyu Li <lihongyu0807@icloud.com>
Signed-off-by: 's avatarToni-SM <toni.semu@gmail.com>
Signed-off-by: 's avatarJames Tigue <166445701+jtigue-bdai@users.noreply.github.com>
Signed-off-by: 's avatarPascal Roth <57946385+pascal-roth@users.noreply.github.com>
Signed-off-by: 's avatarVictor Khaustov <3192677+vi3itor@users.noreply.github.com>
Signed-off-by: 's avatarAlvinC <alvincny529@gmail.com>
Signed-off-by: 's avatarTyler Lum <tylergwlum@gmail.com>
Signed-off-by: 's avatarMiguel Alonso Jr. <76960110+miguelalonsojr@users.noreply.github.com>
Signed-off-by: 's avatarrenaudponcelet <renaud.poncelet@gmail.com>
Co-authored-by: 's avatarjaczhangnv <jaczhang@nvidia.com>
Co-authored-by: 's avatarrwiltz <165190220+rwiltz@users.noreply.github.com>
Co-authored-by: 's avatarKelly Guo <kellyg@nvidia.com>
Co-authored-by: 's avatarYanzi Zhu <yanziz@nvidia.com>
Co-authored-by: 's avatarnv-mhaselton <mhaselton@nvidia.com>
Co-authored-by: 's avatarlotusl-code <lotusl@nvidia.com>
Co-authored-by: 's avatarcosmith-nvidia <141183495+cosmith-nvidia@users.noreply.github.com>
Co-authored-by: 's avatarMichael Gussert <michael@gussert.com>
Co-authored-by: 's avatarCY Chen <cyc@nvidia.com>
Co-authored-by: 's avataroahmednv <oahmed@Nvidia.com>
Co-authored-by: 's avatarAshwin Varghese Kuruttukulam <123109010+ashwinvkNV@users.noreply.github.com>
Co-authored-by: 's avatarRafael Wiltz <rwiltz@nvidia.com>
Co-authored-by: 's avatarPeter Du <peterd@nvidia.com>
Co-authored-by: 's avatarmatthewtrepte <mtrepte@nvidia.com>
Co-authored-by: 's avatarchengronglai <chengrongl@nvidia.com>
Co-authored-by: 's avatarpulkitg01 <pulkitg@nvidia.com>
Co-authored-by: 's avatarConnor Smith <cosmith@nvidia.com>
Co-authored-by: 's avatarAshwin Varghese Kuruttukulam <ashwinvk@nvidia.com>
Co-authored-by: 's avatarKelly Guo <kellyguo123@hotmail.com>
Co-authored-by: 's avatarMayank Mittal <12863862+Mayankm96@users.noreply.github.com>
Co-authored-by: 's avatarsamibouziri <79418773+samibouziri@users.noreply.github.com>
Co-authored-by: 's avatarJames Smith <142246516+jsmith-bdai@users.noreply.github.com>
Co-authored-by: 's avatarShundo Kishi <syundo0730@gmail.com>
Co-authored-by: 's avatarSheikh Dawood <sabdulajees@nvidia.com>
Co-authored-by: 's avatarToni-SM <aserranomuno@nvidia.com>
Co-authored-by: 's avatarGonglitian <70052908+Gonglitian@users.noreply.github.com>
Co-authored-by: 's avatarJames Tigue <166445701+jtigue-bdai@users.noreply.github.com>
Co-authored-by: 's avatarMayank Mittal <mittalma@leggedrobotics.com>
Co-authored-by: 's avatarKyle Morgenstein <34984693+KyleM73@users.noreply.github.com>
Co-authored-by: 's avatarJohnson Sun <20457146+j3soon@users.noreply.github.com>
Co-authored-by: 's avatarPascal Roth <57946385+pascal-roth@users.noreply.github.com>
Co-authored-by: 's avatarHongyu Li <lihongyu0807@icloud.com>
Co-authored-by: 's avatarJean-Francois-Lafleche <57650687+Jean-Francois-Lafleche@users.noreply.github.com>
Co-authored-by: 's avatarWei Jinqi <changshanshi@outlook.com>
Co-authored-by: 's avatarLouis LE LAY <le.lay.louis@gmail.com>
Co-authored-by: 's avatarHarsh Patel <hapatel@theaiinstitute.com>
Co-authored-by: 's avatarKousheek Chakraborty <kousheekc@gmail.com>
Co-authored-by: 's avatarVictor Khaustov <3192677+vi3itor@users.noreply.github.com>
Co-authored-by: 's avatarAlvinC <alvincny529@gmail.com>
Co-authored-by: 's avatarFelipe Mohr <50018670+felipemohr@users.noreply.github.com>
Co-authored-by: 's avatarAdAstra7 <87345760+likecanyon@users.noreply.github.com>
Co-authored-by: 's avatargao <ziqi.gao@iff-extern.fraunhofer.de>
Co-authored-by: 's avatarTyler Lum <tylergwlum@gmail.com>
Co-authored-by: 's avatar-T.K.- <t_k_233@outlook.com>
Co-authored-by: 's avatarClemens Schwarke <96480707+ClemensSchwarke@users.noreply.github.com>
Co-authored-by: 's avatarMiguel Alonso Jr. <76960110+miguelalonsojr@users.noreply.github.com>
Co-authored-by: 's avatarMiguel Alonso Jr. <miguel.alonso@nfinite.app>
Co-authored-by: 's avatarrenaudponcelet <renaud.poncelet@gmail.com>
parent 0224a373
......@@ -111,6 +111,7 @@ Guidelines for modifications:
* Ryley McCarroll
* Shafeef Omar
* Shaoshu Su
* Shaurya Dewan
* Shundo Kishi
* Stefan Van de Mosselaer
* Stephan Pleines
......
......@@ -104,6 +104,7 @@ Table of Contents
source/overview/environments
source/overview/reinforcement-learning/index
source/overview/teleop_imitation
source/overview/augmented_imitation
source/overview/showroom
source/overview/simple_agents
......
This diff is collapsed.
This diff is collapsed.
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Script to construct prompts to control the Cosmos model's generation.
Required arguments:
--templates_path Path to the file containing templates for the prompts.
Optional arguments:
--num_prompts Number of prompts to generate (default: 1).
--output_path Path to the output file to write generated prompts (default: prompts.txt).
"""
import argparse
import json
import random
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="Generate prompts for controlling Cosmos model's generation.")
parser.add_argument(
"--templates_path", type=str, required=True, help="Path to the JSON file containing prompt templates"
)
parser.add_argument("--num_prompts", type=int, default=1, help="Number of prompts to generate (default: 1)")
parser.add_argument(
"--output_path", type=str, default="prompts.txt", help="Path to the output file to write generated prompts"
)
args = parser.parse_args()
return args
def generate_prompt(templates_path: str):
"""Generate a random prompt for controlling the Cosmos model's visual augmentation.
The prompt describes the scene and desired visual variations, which the model
uses to guide the augmentation process while preserving the core robotic actions.
Args:
templates_path (str): Path to the JSON file containing prompt templates.
Returns:
str: Generated prompt string that specifies visual aspects to modify in the video.
"""
try:
with open(templates_path) as f:
templates = json.load(f)
except FileNotFoundError:
raise FileNotFoundError(f"Prompt templates file not found: {templates_path}")
except json.JSONDecodeError:
raise ValueError(f"Invalid JSON in prompt templates file: {templates_path}")
prompt_parts = []
for section_name, section_options in templates.items():
if not isinstance(section_options, list):
continue
if len(section_options) == 0:
continue
selected_option = random.choice(section_options)
prompt_parts.append(selected_option)
return " ".join(prompt_parts)
def main():
# Parse command line arguments
args = parse_args()
prompts = [generate_prompt(args.templates_path) for _ in range(args.num_prompts)]
try:
with open(args.output_path, "w") as f:
for prompt in prompts:
f.write(prompt + "\n")
except Exception as e:
print(f"Failed to write to {args.output_path}: {e}")
if __name__ == "__main__":
main()
This diff is collapsed.
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Script to convert HDF5 demonstration files to MP4 videos.
This script converts camera frames stored in HDF5 demonstration files to MP4 videos.
It supports multiple camera modalities including RGB, segmentation, and normal maps.
The output videos are saved in the specified directory with appropriate naming.
required arguments:
--input_file Path to the input HDF5 file.
--output_dir Directory to save the output MP4 files.
optional arguments:
--input_keys List of input keys to process from the HDF5 file. (default: ["table_cam", "wrist_cam", "table_cam_segmentation", "table_cam_normals", "table_cam_shaded_segmentation"])
--video_height Height of the output video in pixels. (default: 704)
--video_width Width of the output video in pixels. (default: 1280)
--framerate Frames per second for the output video. (default: 30)
"""
# Standard library imports
import argparse
import h5py
import numpy as np
# Third-party imports
import os
import cv2
# Constants
DEFAULT_VIDEO_HEIGHT = 704
DEFAULT_VIDEO_WIDTH = 1280
DEFAULT_INPUT_KEYS = [
"table_cam",
"wrist_cam",
"table_cam_segmentation",
"table_cam_normals",
"table_cam_shaded_segmentation",
"table_cam_depth",
]
DEFAULT_FRAMERATE = 30
LIGHT_SOURCE = np.array([0.0, 0.0, 1.0])
MIN_DEPTH = 0.0
MAX_DEPTH = 1.5
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="Convert HDF5 demonstration files to MP4 videos.")
parser.add_argument(
"--input_file",
type=str,
required=True,
help="Path to the input HDF5 file containing demonstration data.",
)
parser.add_argument(
"--output_dir",
type=str,
required=True,
help="Directory path where the output MP4 files will be saved.",
)
parser.add_argument(
"--input_keys",
type=str,
nargs="+",
default=DEFAULT_INPUT_KEYS,
help="List of input keys to process.",
)
parser.add_argument(
"--video_height",
type=int,
default=DEFAULT_VIDEO_HEIGHT,
help="Height of the output video in pixels.",
)
parser.add_argument(
"--video_width",
type=int,
default=DEFAULT_VIDEO_WIDTH,
help="Width of the output video in pixels.",
)
parser.add_argument(
"--framerate",
type=int,
default=DEFAULT_FRAMERATE,
help="Frames per second for the output video.",
)
args = parser.parse_args()
return args
def write_demo_to_mp4(
hdf5_file,
demo_id,
frames_path,
input_key,
output_dir,
video_height,
video_width,
framerate=DEFAULT_FRAMERATE,
):
"""Convert frames from an HDF5 file to an MP4 video.
Args:
hdf5_file (str): Path to the HDF5 file containing the frames.
demo_id (int): ID of the demonstration to convert.
frames_path (str): Path to the frames data in the HDF5 file.
input_key (str): Name of the input key to convert.
output_dir (str): Directory to save the output MP4 file.
video_height (int): Height of the output video in pixels.
video_width (int): Width of the output video in pixels.
framerate (int, optional): Frames per second for the output video. Defaults to 30.
"""
with h5py.File(hdf5_file, "r") as f:
# Get frames based on input key type
if "shaded_segmentation" in input_key:
temp_key = input_key.replace("shaded_segmentation", "segmentation")
frames = f[f"data/demo_{demo_id}/obs/{temp_key}"]
else:
frames = f[frames_path + "/" + input_key]
# Setup video writer
output_path = os.path.join(output_dir, f"demo_{demo_id}_{input_key}.mp4")
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
if "depth" in input_key:
video = cv2.VideoWriter(output_path, fourcc, framerate, (video_width, video_height), isColor=False)
else:
video = cv2.VideoWriter(output_path, fourcc, framerate, (video_width, video_height))
# Process and write frames
for ix, frame in enumerate(frames):
# Convert normal maps to uint8 if needed
if "normals" in input_key:
frame = (frame * 255.0).astype(np.uint8)
# Process shaded segmentation frames
elif "shaded_segmentation" in input_key:
seg = frame[..., :-1]
normals_key = input_key.replace("shaded_segmentation", "normals")
normals = f[f"data/demo_{demo_id}/obs/{normals_key}"][ix]
shade = 0.5 + (normals * LIGHT_SOURCE[None, None, :]).sum(axis=-1) * 0.5
shaded_seg = (shade[..., None] * seg).astype(np.uint8)
frame = np.concatenate((shaded_seg, frame[..., -1:]), axis=-1)
# Convert RGB to BGR
if "depth" not in input_key:
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
else:
frame = (frame[..., 0] - MIN_DEPTH) / (MAX_DEPTH - MIN_DEPTH)
frame = np.where(frame < 0.01, 1.0, frame)
frame = 1.0 - frame
frame = (frame * 255.0).astype(np.uint8)
# Resize to video resolution
frame = cv2.resize(frame, (video_width, video_height), interpolation=cv2.INTER_CUBIC)
video.write(frame)
video.release()
def get_num_demos(hdf5_file):
"""Get the number of demonstrations in the HDF5 file.
Args:
hdf5_file (str): Path to the HDF5 file.
Returns:
int: Number of demonstrations found in the file.
"""
with h5py.File(hdf5_file, "r") as f:
return len(f["data"].keys())
def main():
"""Main function to convert all demonstrations to MP4 videos."""
# Parse command line arguments
args = parse_args()
# Create output directory if it doesn't exist
os.makedirs(args.output_dir, exist_ok=True)
# Get number of demonstrations from the file
num_demos = get_num_demos(args.input_file)
print(f"Found {num_demos} demonstrations in {args.input_file}")
# Convert each demonstration
for i in range(num_demos):
frames_path = f"data/demo_{str(i)}/obs"
for input_key in args.input_keys:
write_demo_to_mp4(
args.input_file,
i,
frames_path,
input_key,
args.output_dir,
args.video_height,
args.video_width,
args.framerate,
)
if __name__ == "__main__":
main()
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Script to create a new dataset by combining existing HDF5 demonstrations with visually augmented MP4 videos.
This script takes an existing HDF5 dataset containing demonstrations and a directory of MP4 videos
that are visually augmented versions of the original demonstration videos (e.g., with different lighting,
color schemes, or visual effects). It creates a new HDF5 dataset that preserves all the original
demonstration data (actions, robot state, etc.) but replaces the video frames with the augmented versions.
required arguments:
--input_file Path to the input HDF5 file containing original demonstrations.
--output_file Path to save the new HDF5 file with augmented videos.
--videos_dir Directory containing the visually augmented MP4 videos.
"""
# Standard library imports
import argparse
import glob
import h5py
import numpy as np
# Third-party imports
import os
import cv2
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="Create a new dataset with visually augmented videos.")
parser.add_argument(
"--input_file",
type=str,
required=True,
help="Path to the input HDF5 file containing original demonstrations.",
)
parser.add_argument(
"--videos_dir",
type=str,
required=True,
help="Directory containing the visually augmented MP4 videos.",
)
parser.add_argument(
"--output_file",
type=str,
required=True,
help="Path to save the new HDF5 file with augmented videos.",
)
args = parser.parse_args()
return args
def get_frames_from_mp4(video_path, target_height=None, target_width=None):
"""Extract frames from an MP4 video file.
Args:
video_path (str): Path to the MP4 video file.
target_height (int, optional): Target height for resizing frames. If None, no resizing is done.
target_width (int, optional): Target width for resizing frames. If None, no resizing is done.
Returns:
np.ndarray: Array of frames from the video in RGB format.
"""
# Open the video file
video = cv2.VideoCapture(video_path)
# Get video properties
frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
# Read all frames into a numpy array
frames = []
for _ in range(frame_count):
ret, frame = video.read()
if not ret:
break
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
if target_height is not None and target_width is not None:
frame = cv2.resize(frame, (target_width, target_height), interpolation=cv2.INTER_LINEAR)
frames.append(frame)
# Convert to numpy array
frames = np.array(frames).astype(np.uint8)
# Release the video object
video.release()
return frames
def process_video_and_demo(f_in, f_out, video_path, orig_demo_id, new_demo_id):
"""Process a single video and create a new demo with augmented video frames.
Args:
f_in (h5py.File): Input HDF5 file.
f_out (h5py.File): Output HDF5 file.
video_path (str): Path to the augmented video file.
orig_demo_id (int): ID of the original demo to copy.
new_demo_id (int): ID for the new demo.
"""
# Get original demo data
actions = f_in[f"data/demo_{str(orig_demo_id)}/actions"]
eef_pos = f_in[f"data/demo_{str(orig_demo_id)}/obs/eef_pos"]
eef_quat = f_in[f"data/demo_{str(orig_demo_id)}/obs/eef_quat"]
gripper_pos = f_in[f"data/demo_{str(orig_demo_id)}/obs/gripper_pos"]
wrist_cam = f_in[f"data/demo_{str(orig_demo_id)}/obs/wrist_cam"]
# Get original video resolution
orig_video = f_in[f"data/demo_{str(orig_demo_id)}/obs/table_cam"]
target_height, target_width = orig_video.shape[1:3]
# Extract frames from video with original resolution
frames = get_frames_from_mp4(video_path, target_height, target_width)
# Create new datasets
f_out.create_dataset(f"data/demo_{str(new_demo_id)}/actions", data=actions, compression="gzip")
f_out.create_dataset(f"data/demo_{str(new_demo_id)}/obs/eef_pos", data=eef_pos, compression="gzip")
f_out.create_dataset(f"data/demo_{str(new_demo_id)}/obs/eef_quat", data=eef_quat, compression="gzip")
f_out.create_dataset(f"data/demo_{str(new_demo_id)}/obs/gripper_pos", data=gripper_pos, compression="gzip")
f_out.create_dataset(
f"data/demo_{str(new_demo_id)}/obs/table_cam", data=frames.astype(np.uint8), compression="gzip"
)
f_out.create_dataset(f"data/demo_{str(new_demo_id)}/obs/wrist_cam", data=wrist_cam, compression="gzip")
# Copy attributes
f_out[f"data/demo_{str(new_demo_id)}"].attrs["num_samples"] = f_in[f"data/demo_{str(orig_demo_id)}"].attrs[
"num_samples"
]
def main():
"""Main function to create a new dataset with augmented videos."""
# Parse command line arguments
args = parse_args()
# Get list of MP4 videos
search_path = os.path.join(args.videos_dir, "*.mp4")
video_paths = glob.glob(search_path)
video_paths.sort()
print(f"Found {len(video_paths)} MP4 videos in {args.videos_dir}")
# Create output directory if it doesn't exist
os.makedirs(os.path.dirname(args.output_file), exist_ok=True)
with h5py.File(args.input_file, "r") as f_in, h5py.File(args.output_file, "w") as f_out:
# Copy all data from input to output
f_in.copy("data", f_out)
# Get the largest demo ID to start new demos from
demo_ids = [int(key.split("_")[1]) for key in f_in["data"].keys()]
next_demo_id = max(demo_ids) + 1 # noqa: SIM113
print(f"Starting new demos from ID: {next_demo_id}")
# Process each video and create new demo
for video_path in video_paths:
# Extract original demo ID from video filename
video_filename = os.path.basename(video_path)
orig_demo_id = int(video_filename.split("_")[1])
process_video_and_demo(f_in, f_out, video_path, orig_demo_id, next_demo_id)
next_demo_id += 1
print(f"Augmented data saved to {args.output_file}")
if __name__ == "__main__":
main()
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Test cases for Cosmos prompt generation script."""
import json
import os
import tempfile
import unittest
from scripts.tools.cosmos.cosmos_prompt_gen import generate_prompt, main
class TestCosmosPromptGen(unittest.TestCase):
"""Test cases for Cosmos prompt generation functionality."""
@classmethod
def setUpClass(cls):
"""Set up test fixtures that are shared across all test methods."""
# Create temporary templates file
cls.temp_templates_file = tempfile.NamedTemporaryFile(suffix=".json", delete=False)
# Create test templates
test_templates = {
"lighting": ["with bright lighting", "with dim lighting", "with natural lighting"],
"color": ["in warm colors", "in cool colors", "in vibrant colors"],
"style": ["in a realistic style", "in an artistic style", "in a minimalist style"],
"empty_section": [], # Test empty section
"invalid_section": "not a list", # Test invalid section
}
# Write templates to file
with open(cls.temp_templates_file.name, "w") as f:
json.dump(test_templates, f)
def setUp(self):
"""Set up test fixtures that are created for each test method."""
self.temp_output_file = tempfile.NamedTemporaryFile(suffix=".txt", delete=False)
def tearDown(self):
"""Clean up test fixtures after each test method."""
# Remove the temporary output file
os.remove(self.temp_output_file.name)
@classmethod
def tearDownClass(cls):
"""Clean up test fixtures that are shared across all test methods."""
# Remove the temporary templates file
os.remove(cls.temp_templates_file.name)
def test_generate_prompt_valid_templates(self):
"""Test generating a prompt with valid templates."""
prompt = generate_prompt(self.temp_templates_file.name)
# Check that prompt is a string
self.assertIsInstance(prompt, str)
# Check that prompt contains at least one word
self.assertTrue(len(prompt.split()) > 0)
# Check that prompt contains valid sections
valid_sections = ["lighting", "color", "style"]
found_sections = [section for section in valid_sections if section in prompt.lower()]
self.assertTrue(len(found_sections) > 0)
def test_generate_prompt_invalid_file(self):
"""Test generating a prompt with invalid file path."""
with self.assertRaises(FileNotFoundError):
generate_prompt("nonexistent_file.json")
def test_generate_prompt_invalid_json(self):
"""Test generating a prompt with invalid JSON file."""
# Create a temporary file with invalid JSON
with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as temp_file:
temp_file.write(b"invalid json content")
temp_file.flush()
try:
with self.assertRaises(ValueError):
generate_prompt(temp_file.name)
finally:
os.remove(temp_file.name)
def test_main_function_single_prompt(self):
"""Test main function with single prompt generation."""
# Mock command line arguments
import sys
original_argv = sys.argv
sys.argv = [
"cosmos_prompt_gen.py",
"--templates_path",
self.temp_templates_file.name,
"--num_prompts",
"1",
"--output_path",
self.temp_output_file.name,
]
try:
main()
# Check if output file was created
self.assertTrue(os.path.exists(self.temp_output_file.name))
# Check content of output file
with open(self.temp_output_file.name) as f:
content = f.read().strip()
self.assertTrue(len(content) > 0)
self.assertEqual(len(content.split("\n")), 1)
finally:
# Restore original argv
sys.argv = original_argv
def test_main_function_multiple_prompts(self):
"""Test main function with multiple prompt generation."""
# Mock command line arguments
import sys
original_argv = sys.argv
sys.argv = [
"cosmos_prompt_gen.py",
"--templates_path",
self.temp_templates_file.name,
"--num_prompts",
"3",
"--output_path",
self.temp_output_file.name,
]
try:
main()
# Check if output file was created
self.assertTrue(os.path.exists(self.temp_output_file.name))
# Check content of output file
with open(self.temp_output_file.name) as f:
content = f.read().strip()
self.assertTrue(len(content) > 0)
self.assertEqual(len(content.split("\n")), 3)
# Check that each line is a valid prompt
for line in content.split("\n"):
self.assertTrue(len(line) > 0)
finally:
# Restore original argv
sys.argv = original_argv
def test_main_function_default_output(self):
"""Test main function with default output path."""
# Mock command line arguments
import sys
original_argv = sys.argv
sys.argv = ["cosmos_prompt_gen.py", "--templates_path", self.temp_templates_file.name, "--num_prompts", "1"]
try:
main()
# Check if default output file was created
self.assertTrue(os.path.exists("prompts.txt"))
# Clean up default output file
os.remove("prompts.txt")
finally:
# Restore original argv
sys.argv = original_argv
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Test cases for HDF5 to MP4 conversion script."""
import h5py
import numpy as np
import os
import tempfile
import unittest
from scripts.tools.hdf5_to_mp4 import get_num_demos, main, write_demo_to_mp4
class TestHDF5ToMP4(unittest.TestCase):
"""Test cases for HDF5 to MP4 conversion functionality."""
@classmethod
def setUpClass(cls):
"""Set up test fixtures that are shared across all test methods."""
# Create temporary HDF5 file with test data
cls.temp_hdf5_file = tempfile.NamedTemporaryFile(suffix=".h5", delete=False)
with h5py.File(cls.temp_hdf5_file.name, "w") as h5f:
# Create test data structure
for demo_id in range(2): # Create 2 demos
demo_group = h5f.create_group(f"data/demo_{demo_id}/obs")
# Create RGB frames (2 frames per demo)
rgb_data = np.random.randint(0, 255, (2, 704, 1280, 3), dtype=np.uint8)
demo_group.create_dataset("table_cam", data=rgb_data)
# Create segmentation frames
seg_data = np.random.randint(0, 255, (2, 704, 1280, 4), dtype=np.uint8)
demo_group.create_dataset("table_cam_segmentation", data=seg_data)
# Create normal maps
normals_data = np.random.rand(2, 704, 1280, 3).astype(np.float32)
demo_group.create_dataset("table_cam_normals", data=normals_data)
# Create depth maps
depth_data = np.random.rand(2, 704, 1280, 1).astype(np.float32)
demo_group.create_dataset("table_cam_depth", data=depth_data)
def setUp(self):
"""Set up test fixtures that are created for each test method."""
self.temp_output_dir = tempfile.mkdtemp()
def tearDown(self):
"""Clean up test fixtures after each test method."""
# Remove all files in the output directory
for file in os.listdir(self.temp_output_dir):
os.remove(os.path.join(self.temp_output_dir, file))
# Remove the output directory
os.rmdir(self.temp_output_dir)
@classmethod
def tearDownClass(cls):
"""Clean up test fixtures that are shared across all test methods."""
# Remove the temporary HDF5 file
os.remove(cls.temp_hdf5_file.name)
def test_get_num_demos(self):
"""Test the get_num_demos function."""
num_demos = get_num_demos(self.temp_hdf5_file.name)
self.assertEqual(num_demos, 2)
def test_write_demo_to_mp4_rgb(self):
"""Test writing RGB frames to MP4."""
write_demo_to_mp4(self.temp_hdf5_file.name, 0, "data/demo_0/obs", "table_cam", self.temp_output_dir, 704, 1280)
output_file = os.path.join(self.temp_output_dir, "demo_0_table_cam.mp4")
self.assertTrue(os.path.exists(output_file))
self.assertGreater(os.path.getsize(output_file), 0)
def test_write_demo_to_mp4_segmentation(self):
"""Test writing segmentation frames to MP4."""
write_demo_to_mp4(
self.temp_hdf5_file.name, 0, "data/demo_0/obs", "table_cam_segmentation", self.temp_output_dir, 704, 1280
)
output_file = os.path.join(self.temp_output_dir, "demo_0_table_cam_segmentation.mp4")
self.assertTrue(os.path.exists(output_file))
self.assertGreater(os.path.getsize(output_file), 0)
def test_write_demo_to_mp4_normals(self):
"""Test writing normal maps to MP4."""
write_demo_to_mp4(
self.temp_hdf5_file.name, 0, "data/demo_0/obs", "table_cam_normals", self.temp_output_dir, 704, 1280
)
output_file = os.path.join(self.temp_output_dir, "demo_0_table_cam_normals.mp4")
self.assertTrue(os.path.exists(output_file))
self.assertGreater(os.path.getsize(output_file), 0)
def test_write_demo_to_mp4_shaded_segmentation(self):
"""Test writing shaded_segmentation frames to MP4."""
write_demo_to_mp4(
self.temp_hdf5_file.name,
0,
"data/demo_0/obs",
"table_cam_shaded_segmentation",
self.temp_output_dir,
704,
1280,
)
output_file = os.path.join(self.temp_output_dir, "demo_0_table_cam_shaded_segmentation.mp4")
self.assertTrue(os.path.exists(output_file))
self.assertGreater(os.path.getsize(output_file), 0)
def test_write_demo_to_mp4_depth(self):
"""Test writing depth maps to MP4."""
write_demo_to_mp4(
self.temp_hdf5_file.name, 0, "data/demo_0/obs", "table_cam_depth", self.temp_output_dir, 704, 1280
)
output_file = os.path.join(self.temp_output_dir, "demo_0_table_cam_depth.mp4")
self.assertTrue(os.path.exists(output_file))
self.assertGreater(os.path.getsize(output_file), 0)
def test_write_demo_to_mp4_invalid_demo(self):
"""Test writing with invalid demo ID."""
with self.assertRaises(KeyError):
write_demo_to_mp4(
self.temp_hdf5_file.name,
999, # Invalid demo ID
"data/demo_999/obs",
"table_cam",
self.temp_output_dir,
704,
1280,
)
def test_write_demo_to_mp4_invalid_key(self):
"""Test writing with invalid input key."""
with self.assertRaises(KeyError):
write_demo_to_mp4(
self.temp_hdf5_file.name, 0, "data/demo_0/obs", "invalid_key", self.temp_output_dir, 704, 1280
)
def test_main_function(self):
"""Test the main function."""
# Mock command line arguments
import sys
original_argv = sys.argv
sys.argv = [
"hdf5_to_mp4.py",
"--input_file",
self.temp_hdf5_file.name,
"--output_dir",
self.temp_output_dir,
"--input_keys",
"table_cam",
"table_cam_segmentation",
"--video_height",
"704",
"--video_width",
"1280",
"--framerate",
"30",
]
try:
main()
# Check if output files were created
expected_files = [
"demo_0_table_cam.mp4",
"demo_0_table_cam_segmentation.mp4",
"demo_1_table_cam.mp4",
"demo_1_table_cam_segmentation.mp4",
]
for file in expected_files:
output_file = os.path.join(self.temp_output_dir, file)
self.assertTrue(os.path.exists(output_file))
self.assertGreater(os.path.getsize(output_file), 0)
finally:
# Restore original argv
sys.argv = original_argv
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Test cases for MP4 to HDF5 conversion script."""
import h5py
import numpy as np
import os
import tempfile
import unittest
import cv2
from scripts.tools.mp4_to_hdf5 import get_frames_from_mp4, main, process_video_and_demo
class TestMP4ToHDF5(unittest.TestCase):
"""Test cases for MP4 to HDF5 conversion functionality."""
@classmethod
def setUpClass(cls):
"""Set up test fixtures that are shared across all test methods."""
# Create temporary HDF5 file with test data
cls.temp_hdf5_file = tempfile.NamedTemporaryFile(suffix=".h5", delete=False)
with h5py.File(cls.temp_hdf5_file.name, "w") as h5f:
# Create test data structure for 2 demos
for demo_id in range(2):
demo_group = h5f.create_group(f"data/demo_{demo_id}")
obs_group = demo_group.create_group("obs")
# Create actions data
actions_data = np.random.rand(10, 7).astype(np.float32)
demo_group.create_dataset("actions", data=actions_data)
# Create robot state data
eef_pos_data = np.random.rand(10, 3).astype(np.float32)
eef_quat_data = np.random.rand(10, 4).astype(np.float32)
gripper_pos_data = np.random.rand(10, 1).astype(np.float32)
obs_group.create_dataset("eef_pos", data=eef_pos_data)
obs_group.create_dataset("eef_quat", data=eef_quat_data)
obs_group.create_dataset("gripper_pos", data=gripper_pos_data)
# Create camera data
table_cam_data = np.random.randint(0, 255, (10, 704, 1280, 3), dtype=np.uint8)
wrist_cam_data = np.random.randint(0, 255, (10, 704, 1280, 3), dtype=np.uint8)
obs_group.create_dataset("table_cam", data=table_cam_data)
obs_group.create_dataset("wrist_cam", data=wrist_cam_data)
# Set attributes
demo_group.attrs["num_samples"] = 10
# Create temporary MP4 files
cls.temp_videos_dir = tempfile.mkdtemp()
cls.video_paths = []
for demo_id in range(2):
video_path = os.path.join(cls.temp_videos_dir, f"demo_{demo_id}_table_cam.mp4")
cls.video_paths.append(video_path)
# Create a test video
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
video = cv2.VideoWriter(video_path, fourcc, 30, (1280, 704))
# Write some random frames
for _ in range(10):
frame = np.random.randint(0, 255, (704, 1280, 3), dtype=np.uint8)
video.write(frame)
video.release()
def setUp(self):
"""Set up test fixtures that are created for each test method."""
self.temp_output_file = tempfile.NamedTemporaryFile(suffix=".h5", delete=False)
def tearDown(self):
"""Clean up test fixtures after each test method."""
# Remove the temporary output file
os.remove(self.temp_output_file.name)
@classmethod
def tearDownClass(cls):
"""Clean up test fixtures that are shared across all test methods."""
# Remove the temporary HDF5 file
os.remove(cls.temp_hdf5_file.name)
# Remove temporary videos and directory
for video_path in cls.video_paths:
os.remove(video_path)
os.rmdir(cls.temp_videos_dir)
def test_get_frames_from_mp4(self):
"""Test extracting frames from MP4 video."""
frames = get_frames_from_mp4(self.video_paths[0])
# Check frame properties
self.assertEqual(frames.shape[0], 10) # Number of frames
self.assertEqual(frames.shape[1:], (704, 1280, 3)) # Frame dimensions
self.assertEqual(frames.dtype, np.uint8) # Data type
def test_get_frames_from_mp4_resize(self):
"""Test extracting frames with resizing."""
target_height, target_width = 352, 640
frames = get_frames_from_mp4(self.video_paths[0], target_height, target_width)
# Check resized frame properties
self.assertEqual(frames.shape[0], 10) # Number of frames
self.assertEqual(frames.shape[1:], (target_height, target_width, 3)) # Resized dimensions
self.assertEqual(frames.dtype, np.uint8) # Data type
def test_process_video_and_demo(self):
"""Test processing a single video and creating a new demo."""
with h5py.File(self.temp_hdf5_file.name, "r") as f_in, h5py.File(self.temp_output_file.name, "w") as f_out:
process_video_and_demo(f_in, f_out, self.video_paths[0], 0, 2)
# Check if new demo was created with correct data
self.assertIn("data/demo_2", f_out)
self.assertIn("data/demo_2/actions", f_out)
self.assertIn("data/demo_2/obs/eef_pos", f_out)
self.assertIn("data/demo_2/obs/eef_quat", f_out)
self.assertIn("data/demo_2/obs/gripper_pos", f_out)
self.assertIn("data/demo_2/obs/table_cam", f_out)
self.assertIn("data/demo_2/obs/wrist_cam", f_out)
# Check data shapes
self.assertEqual(f_out["data/demo_2/actions"].shape, (10, 7))
self.assertEqual(f_out["data/demo_2/obs/eef_pos"].shape, (10, 3))
self.assertEqual(f_out["data/demo_2/obs/eef_quat"].shape, (10, 4))
self.assertEqual(f_out["data/demo_2/obs/gripper_pos"].shape, (10, 1))
self.assertEqual(f_out["data/demo_2/obs/table_cam"].shape, (10, 704, 1280, 3))
self.assertEqual(f_out["data/demo_2/obs/wrist_cam"].shape, (10, 704, 1280, 3))
# Check attributes
self.assertEqual(f_out["data/demo_2"].attrs["num_samples"], 10)
def test_main_function(self):
"""Test the main function."""
# Mock command line arguments
import sys
original_argv = sys.argv
sys.argv = [
"mp4_to_hdf5.py",
"--input_file",
self.temp_hdf5_file.name,
"--videos_dir",
self.temp_videos_dir,
"--output_file",
self.temp_output_file.name,
]
try:
main()
# Check if output file was created with correct data
with h5py.File(self.temp_output_file.name, "r") as f:
# Check if original demos were copied
self.assertIn("data/demo_0", f)
self.assertIn("data/demo_1", f)
# Check if new demos were created
self.assertIn("data/demo_2", f)
self.assertIn("data/demo_3", f)
# Check data in new demos
for demo_id in [2, 3]:
self.assertIn(f"data/demo_{demo_id}/actions", f)
self.assertIn(f"data/demo_{demo_id}/obs/eef_pos", f)
self.assertIn(f"data/demo_{demo_id}/obs/eef_quat", f)
self.assertIn(f"data/demo_{demo_id}/obs/gripper_pos", f)
self.assertIn(f"data/demo_{demo_id}/obs/table_cam", f)
self.assertIn(f"data/demo_{demo_id}/obs/wrist_cam", f)
finally:
# Restore original argv
sys.argv = original_argv
if __name__ == "__main__":
unittest.main()
......@@ -281,8 +281,8 @@ Changed
:meth:`~isaaclab.utils.math.quat_apply` and :meth:`~isaaclab.utils.math.quat_apply_inverse` for speed.
0.40.9 (2025-05-19)
~~~~~~~~~~~~~~~~~~~
0.40.10 (2025-05-19)
~~~~~~~~~~~~~~~~~~~~
Fixed
^^^^^
......@@ -291,7 +291,7 @@ Fixed
of assets and sensors.used from the experience files and the double definition is removed.
0.40.8 (2025-01-30)
0.40.9 (2025-01-30)
~~~~~~~~~~~~~~~~~~~
Added
......@@ -301,7 +301,7 @@ Added
in the simulation.
0.40.7 (2025-05-16)
0.40.8 (2025-05-16)
~~~~~~~~~~~~~~~~~~~
Added
......@@ -316,7 +316,7 @@ Changed
resampling call.
0.40.6 (2025-05-16)
0.40.7 (2025-05-16)
~~~~~~~~~~~~~~~~~~~
Fixed
......@@ -325,7 +325,7 @@ Fixed
* Fixed penetration issue for negative border height in :class:`~isaaclab.terrains.terrain_generator.TerrainGeneratorCfg`.
0.40.5 (2025-05-16)
0.40.6 (2025-05-20)
~~~~~~~~~~~~~~~~~~~
Changed
......@@ -340,7 +340,7 @@ Added
* Added :meth:`~isaaclab.utils.math.rigid_body_twist_transform`
0.40.4 (2025-05-15)
0.40.5 (2025-05-15)
~~~~~~~~~~~~~~~~~~~
Fixed
......@@ -354,13 +354,22 @@ Fixed
unused USD camera parameters.
0.40.3 (2025-05-14)
0.40.4 (2025-05-14)
~~~~~~~~~~~~~~~~~~~
* Added a new attribute :attr:`articulation_root_prim_path` to the :class:`~isaaclab.assets.ArticulationCfg` class
to allow explicitly specifying the prim path of the articulation root.
0.40.3 (2025-05-14)
~~~~~~~~~~~~~~~~~~~
Changed
^^^^^^^
* Made modifications to :func:`isaaclab.envs.mdp.image` to handle image normalization for normal maps.
0.40.2 (2025-05-14)
~~~~~~~~~~~~~~~~~~~
......
......@@ -352,7 +352,7 @@ def image(
if (data_type == "distance_to_camera") and convert_perspective_to_orthogonal:
images = math_utils.orthogonalize_perspective_depth(images, sensor.data.intrinsic_matrices)
# rgb/depth image normalization
# rgb/depth/normals image normalization
if normalize:
if data_type == "rgb":
images = images.float() / 255.0
......@@ -360,6 +360,8 @@ def image(
images -= mean_tensor
elif "distance_to" in data_type or "depth" in data_type:
images[images == float("inf")] = 0
elif "normals" in data_type:
images = (images + 1.0) * 0.5
return images.clone()
......
[package]
# Semantic Versioning is used: https://semver.org/
version = "1.0.8"
version = "1.0.9"
# Description
category = "isaaclab"
......
Changelog
---------
1.0.9 (2025-05-20)
~~~~~~~~~~~~~~~~~~
Added
^^^^^
* Added ``Isaac-Stack-Cube-Franka-IK-Rel-Visuomotor-Cosmos-Mimic-v0`` environment for Cosmos vision stacking.
1.0.8 (2025-05-01)
~~~~~~~~~~~~~~~~~~
......
......@@ -12,6 +12,7 @@ from .franka_stack_ik_abs_mimic_env_cfg import FrankaCubeStackIKAbsMimicEnvCfg
from .franka_stack_ik_rel_blueprint_mimic_env_cfg import FrankaCubeStackIKRelBlueprintMimicEnvCfg
from .franka_stack_ik_rel_mimic_env import FrankaCubeStackIKRelMimicEnv
from .franka_stack_ik_rel_mimic_env_cfg import FrankaCubeStackIKRelMimicEnvCfg
from .franka_stack_ik_rel_visuomotor_cosmos_mimic_env_cfg import FrankaCubeStackIKRelVisuomotorCosmosMimicEnvCfg
from .franka_stack_ik_rel_visuomotor_mimic_env_cfg import FrankaCubeStackIKRelVisuomotorMimicEnvCfg
##
......@@ -53,3 +54,14 @@ gym.register(
},
disable_env_checker=True,
)
gym.register(
id="Isaac-Stack-Cube-Franka-IK-Rel-Visuomotor-Cosmos-Mimic-v0",
entry_point="isaaclab_mimic.envs:FrankaCubeStackIKRelMimicEnv",
kwargs={
"env_cfg_entry_point": (
franka_stack_ik_rel_visuomotor_cosmos_mimic_env_cfg.FrankaCubeStackIKRelVisuomotorCosmosMimicEnvCfg
),
},
disable_env_checker=True,
)
# Copyright (c) 2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
from isaaclab.envs.mimic_env_cfg import MimicEnvCfg, SubTaskConfig
from isaaclab.utils import configclass
from isaaclab_tasks.manager_based.manipulation.stack.config.franka.stack_ik_rel_visuomotor_cosmos_env_cfg import (
FrankaCubeStackVisuomotorCosmosEnvCfg,
)
@configclass
class FrankaCubeStackIKRelVisuomotorCosmosMimicEnvCfg(FrankaCubeStackVisuomotorCosmosEnvCfg, MimicEnvCfg):
"""
Isaac Lab Mimic environment config class for Franka Cube Stack IK Rel Visuomotor Cosmos env.
"""
def __post_init__(self):
# post init of parents
super().__post_init__()
# Override the existing values
self.datagen_config.name = "isaac_lab_franka_stack_ik_rel_visuomotor_cosmos_D0"
self.datagen_config.generation_guarantee = True
self.datagen_config.generation_keep_failed = True
self.datagen_config.generation_num_trials = 10
self.datagen_config.generation_select_src_per_subtask = True
self.datagen_config.generation_transform_first_robot_pose = False
self.datagen_config.generation_interpolate_from_last_target_pose = True
self.datagen_config.generation_relative = True
self.datagen_config.max_num_failures = 25
self.datagen_config.seed = 1
# The following are the subtask configurations for the stack task.
subtask_configs = []
subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_2",
# This key corresponds to the binary indicator in "datagen_info" that signals
# when this subtask is finished (e.g., on a 0 to 1 edge).
subtask_term_signal="grasp_1",
# Specifies time offsets for data generation when splitting a trajectory into
# subtask segments. Random offsets are added to the termination boundary.
subtask_term_offset_range=(10, 20),
# Selection strategy for the source subtask segment during data generation
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.03,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=5,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_1",
# Corresponding key for the binary indicator in "datagen_info" for completion
subtask_term_signal="stack_1",
# Time offsets for data generation when splitting a trajectory
subtask_term_offset_range=(10, 20),
# Selection strategy for source subtask segment
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.03,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=5,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_3",
# Corresponding key for the binary indicator in "datagen_info" for completion
subtask_term_signal="grasp_2",
# Time offsets for data generation when splitting a trajectory
subtask_term_offset_range=(10, 20),
# Selection strategy for source subtask segment
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.03,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=5,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
subtask_configs.append(
SubTaskConfig(
# Each subtask involves manipulation with respect to a single object frame.
object_ref="cube_2",
# End of final subtask does not need to be detected
subtask_term_signal=None,
# No time offsets for the final subtask
subtask_term_offset_range=(0, 0),
# Selection strategy for source subtask segment
selection_strategy="nearest_neighbor_object",
# Optional parameters for the selection strategy function
selection_strategy_kwargs={"nn_k": 3},
# Amount of action noise to apply during this subtask
action_noise=0.03,
# Number of interpolation steps to bridge to this subtask segment
num_interpolation_steps=5,
# Additional fixed steps for the robot to reach the necessary pose
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
)
)
self.subtask_configs["franka"] = subtask_configs
[package]
# Note: Semantic Versioning is used: https://semver.org/
version = "0.10.39"
version = "0.10.40"
# Description
title = "Isaac Lab Environments"
......
Changelog
---------
0.10.39 (2025-06-26)
0.10.40 (2025-06-26)
~~~~~~~~~~~~~~~~~~~~
Fixed
......@@ -10,7 +10,7 @@ Fixed
* Relaxed upper range pin for protobuf python dependency for more permissive installation.
0.10.38 (2025-05-22)
0.10.39 (2025-05-22)
~~~~~~~~~~~~~~~~~~~~
Fixed
......@@ -19,7 +19,7 @@ Fixed
* Fixed redundant body_names assignment in rough_env_cfg.py for H1 robot.
0.10.37 (2025-06-16)
0.10.38 (2025-06-16)
~~~~~~~~~~~~~~~~~~~~
Changed
......@@ -28,7 +28,7 @@ Changed
* Show available RL library configs on error message when an entry point key is not available for a given task.
0.10.36 (2025-05-15)
0.10.37 (2025-05-15)
~~~~~~~~~~~~~~~~~~~~
Added
......@@ -38,7 +38,7 @@ Added
implements assembly tasks to insert pegs into their corresponding sockets.
0.10.35 (2025-05-21)
0.10.36 (2025-05-21)
~~~~~~~~~~~~~~~~~~~~
Added
......@@ -48,6 +48,21 @@ Added
can be pushed to a visualization dashboard to track improvements or regressions.
0.10.35 (2025-05-21)
~~~~~~~~~~~~~~~~~~~~
Added
^^^^^
* Added ``Isaac-Stack-Cube-Franka-IK-Rel-Visuomotor-Cosmos-v0`` stacking environment with multi-modality camera inputs at higher resolution.
Changed
^^^^^^^
* Updated the ``Isaac-Stack-Cube-Franka-IK-Rel-Visuomotor-v0`` stacking environment to support visual domain randomization events during model evaluation.
* Made the task termination condition for the stacking task more strict.
0.10.34 (2025-05-22)
~~~~~~~~~~~~~~~~~~~~
......
......@@ -11,6 +11,7 @@ from . import (
stack_ik_rel_blueprint_env_cfg,
stack_ik_rel_env_cfg,
stack_ik_rel_instance_randomize_env_cfg,
stack_ik_rel_visuomotor_cosmos_env_cfg,
stack_ik_rel_visuomotor_env_cfg,
stack_joint_pos_env_cfg,
stack_joint_pos_instance_randomize_env_cfg,
......@@ -67,6 +68,16 @@ gym.register(
disable_env_checker=True,
)
gym.register(
id="Isaac-Stack-Cube-Franka-IK-Rel-Visuomotor-Cosmos-v0",
entry_point="isaaclab.envs:ManagerBasedRLEnv",
kwargs={
"env_cfg_entry_point": stack_ik_rel_visuomotor_cosmos_env_cfg.FrankaCubeStackVisuomotorCosmosEnvCfg,
"robomimic_bc_cfg_entry_point": os.path.join(agents.__path__[0], "robomimic/bc_rnn_image_cosmos.json"),
},
disable_env_checker=True,
)
gym.register(
id="Isaac-Stack-Cube-Franka-IK-Abs-v0",
entry_point="isaaclab.envs:ManagerBasedRLEnv",
......
{
"algo_name": "bc",
"experiment": {
"name": "bc_rnn_image_franka_stack_cosmos",
"validate": false,
"logging": {
"terminal_output_to_txt": true,
"log_tb": true
},
"save": {
"enabled": true,
"every_n_seconds": null,
"every_n_epochs": 20,
"epochs": [],
"on_best_validation": false,
"on_best_rollout_return": false,
"on_best_rollout_success_rate": true
},
"epoch_every_n_steps": 500,
"env": null,
"additional_envs": null,
"render": false,
"render_video": false,
"rollout": {
"enabled": false
}
},
"train": {
"data": null,
"num_data_workers": 4,
"hdf5_cache_mode": "low_dim",
"hdf5_use_swmr": true,
"hdf5_load_next_obs": false,
"hdf5_normalize_obs": false,
"hdf5_filter_key": null,
"hdf5_validation_filter_key": null,
"seq_length": 10,
"pad_seq_length": true,
"frame_stack": 1,
"pad_frame_stack": true,
"dataset_keys": [
"actions",
"rewards",
"dones"
],
"goal_mode": null,
"cuda": true,
"batch_size": 16,
"num_epochs": 600,
"seed": 101
},
"algo": {
"optim_params": {
"policy": {
"optimizer_type": "adam",
"learning_rate": {
"initial": 0.0001,
"decay_factor": 0.1,
"epoch_schedule": [],
"scheduler_type": "multistep"
},
"regularization": {
"L2": 0.0
}
}
},
"loss": {
"l2_weight": 1.0,
"l1_weight": 0.0,
"cos_weight": 0.0
},
"actor_layer_dims": [],
"gaussian": {
"enabled": false,
"fixed_std": false,
"init_std": 0.1,
"min_std": 0.01,
"std_activation": "softplus",
"low_noise_eval": true
},
"gmm": {
"enabled": true,
"num_modes": 5,
"min_std": 0.0001,
"std_activation": "softplus",
"low_noise_eval": true
},
"vae": {
"enabled": false,
"latent_dim": 14,
"latent_clip": null,
"kl_weight": 1.0,
"decoder": {
"is_conditioned": true,
"reconstruction_sum_across_elements": false
},
"prior": {
"learn": false,
"is_conditioned": false,
"use_gmm": false,
"gmm_num_modes": 10,
"gmm_learn_weights": false,
"use_categorical": false,
"categorical_dim": 10,
"categorical_gumbel_softmax_hard": false,
"categorical_init_temp": 1.0,
"categorical_temp_anneal_step": 0.001,
"categorical_min_temp": 0.3
},
"encoder_layer_dims": [
300,
400
],
"decoder_layer_dims": [
300,
400
],
"prior_layer_dims": [
300,
400
]
},
"rnn": {
"enabled": true,
"horizon": 10,
"hidden_dim": 1000,
"rnn_type": "LSTM",
"num_layers": 2,
"open_loop": false,
"kwargs": {
"bidirectional": false
}
},
"transformer": {
"enabled": false,
"context_length": 10,
"embed_dim": 512,
"num_layers": 6,
"num_heads": 8,
"emb_dropout": 0.1,
"attn_dropout": 0.1,
"block_output_dropout": 0.1,
"sinusoidal_embedding": false,
"activation": "gelu",
"supervise_all_steps": false,
"nn_parameter_for_timesteps": true
}
},
"observation": {
"modalities": {
"obs": {
"low_dim": [
"eef_pos",
"eef_quat",
"gripper_pos"
],
"rgb": [
"table_cam"
],
"depth": [],
"scan": []
},
"goal": {
"low_dim": [],
"rgb": [],
"depth": [],
"scan": []
}
},
"encoder": {
"low_dim": {
"core_class": null,
"core_kwargs": {},
"obs_randomizer_class": null,
"obs_randomizer_kwargs": {}
},
"rgb": {
"core_class": "VisualCore",
"core_kwargs": {
"feature_dimension": 64,
"flatten": true,
"backbone_class": "ResNet18Conv",
"backbone_kwargs": {
"pretrained": false,
"input_coord_conv": false
},
"pool_class": "SpatialSoftmax",
"pool_kwargs": {
"num_kp": 32,
"learnable_temperature": false,
"temperature": 1.0,
"noise_std": 0.0,
"output_variance": false
}
},
"obs_randomizer_class": "CropRandomizer",
"obs_randomizer_kwargs": {
"crop_height": 180,
"crop_width": 180,
"num_crops": 1,
"pos_enc": false
}
},
"depth": {
"core_class": "VisualCore",
"core_kwargs": {},
"obs_randomizer_class": null,
"obs_randomizer_kwargs": {}
},
"scan": {
"core_class": "ScanCore",
"core_kwargs": {},
"obs_randomizer_class": null,
"obs_randomizer_kwargs": {}
}
}
}
}
# Copyright (c) 2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import isaaclab.sim as sim_utils
from isaaclab.managers import ObservationGroupCfg as ObsGroup
from isaaclab.managers import ObservationTermCfg as ObsTerm
from isaaclab.managers import SceneEntityCfg
from isaaclab.sensors import CameraCfg
from isaaclab.utils import configclass
from isaaclab_tasks.manager_based.manipulation.stack import mdp
from . import stack_ik_rel_visuomotor_env_cfg
@configclass
class ObservationsCfg:
"""Observation specifications for the MDP."""
@configclass
class PolicyCfg(ObsGroup):
"""Observations for policy group with state values."""
actions = ObsTerm(func=mdp.last_action)
joint_pos = ObsTerm(func=mdp.joint_pos_rel)
joint_vel = ObsTerm(func=mdp.joint_vel_rel)
object = ObsTerm(func=mdp.object_obs)
cube_positions = ObsTerm(func=mdp.cube_positions_in_world_frame)
cube_orientations = ObsTerm(func=mdp.cube_orientations_in_world_frame)
eef_pos = ObsTerm(func=mdp.ee_frame_pos)
eef_quat = ObsTerm(func=mdp.ee_frame_quat)
gripper_pos = ObsTerm(func=mdp.gripper_pos)
table_cam = ObsTerm(
func=mdp.image, params={"sensor_cfg": SceneEntityCfg("table_cam"), "data_type": "rgb", "normalize": False}
)
wrist_cam = ObsTerm(
func=mdp.image, params={"sensor_cfg": SceneEntityCfg("wrist_cam"), "data_type": "rgb", "normalize": False}
)
table_cam_segmentation = ObsTerm(
func=mdp.image,
params={"sensor_cfg": SceneEntityCfg("table_cam"), "data_type": "semantic_segmentation", "normalize": True},
)
table_cam_normals = ObsTerm(
func=mdp.image,
params={"sensor_cfg": SceneEntityCfg("table_cam"), "data_type": "normals", "normalize": True},
)
table_cam_depth = ObsTerm(
func=mdp.image,
params={
"sensor_cfg": SceneEntityCfg("table_cam"),
"data_type": "distance_to_image_plane",
"normalize": True,
},
)
def __post_init__(self):
self.enable_corruption = False
self.concatenate_terms = False
@configclass
class SubtaskCfg(ObsGroup):
"""Observations for subtask group."""
grasp_1 = ObsTerm(
func=mdp.object_grasped,
params={
"robot_cfg": SceneEntityCfg("robot"),
"ee_frame_cfg": SceneEntityCfg("ee_frame"),
"object_cfg": SceneEntityCfg("cube_2"),
},
)
stack_1 = ObsTerm(
func=mdp.object_stacked,
params={
"robot_cfg": SceneEntityCfg("robot"),
"upper_object_cfg": SceneEntityCfg("cube_2"),
"lower_object_cfg": SceneEntityCfg("cube_1"),
},
)
grasp_2 = ObsTerm(
func=mdp.object_grasped,
params={
"robot_cfg": SceneEntityCfg("robot"),
"ee_frame_cfg": SceneEntityCfg("ee_frame"),
"object_cfg": SceneEntityCfg("cube_3"),
},
)
def __post_init__(self):
self.enable_corruption = False
self.concatenate_terms = False
# observation groups
policy: PolicyCfg = PolicyCfg()
subtask_terms: SubtaskCfg = SubtaskCfg()
@configclass
class FrankaCubeStackVisuomotorCosmosEnvCfg(stack_ik_rel_visuomotor_env_cfg.FrankaCubeStackVisuomotorEnvCfg):
observations: ObservationsCfg = ObservationsCfg()
def __post_init__(self):
# post init of parent
super().__post_init__()
SEMANTIC_MAPPING = {
"class:cube_1": (120, 230, 255, 255),
"class:cube_2": (255, 36, 66, 255),
"class:cube_3": (55, 255, 139, 255),
"class:table": (255, 237, 218, 255),
"class:ground": (100, 100, 100, 255),
"class:robot": (204, 110, 248, 255),
"class:UNLABELLED": (150, 150, 150, 255),
"class:BACKGROUND": (200, 200, 200, 255),
}
# Set cameras
# Set wrist camera
self.scene.wrist_cam = CameraCfg(
prim_path="{ENV_REGEX_NS}/Robot/panda_hand/wrist_cam",
update_period=0.0,
height=200,
width=200,
data_types=["rgb", "distance_to_image_plane"],
spawn=sim_utils.PinholeCameraCfg(
focal_length=24.0, focus_distance=400.0, horizontal_aperture=20.955, clipping_range=(0.1, 2)
),
offset=CameraCfg.OffsetCfg(
pos=(0.13, 0.0, -0.15), rot=(-0.70614, 0.03701, 0.03701, -0.70614), convention="ros"
),
)
# Set table view camera
self.scene.table_cam = CameraCfg(
prim_path="{ENV_REGEX_NS}/table_cam",
update_period=0.0,
height=200,
width=200,
data_types=["rgb", "semantic_segmentation", "normals", "distance_to_image_plane"],
colorize_semantic_segmentation=True,
semantic_segmentation_mapping=SEMANTIC_MAPPING,
spawn=sim_utils.PinholeCameraCfg(
focal_length=24.0, focus_distance=400.0, horizontal_aperture=20.955, clipping_range=(0.1, 2)
),
offset=CameraCfg.OffsetCfg(
pos=(1.0, 0.0, 0.4), rot=(0.35355, -0.61237, -0.61237, 0.35355), convention="ros"
),
)
# Set settings for camera rendering
self.rerender_on_reset = True
self.sim.render.antialiasing_mode = "OFF" # disable dlss
# List of image observations in policy observations
self.image_obs_list = ["table_cam", "wrist_cam"]
......@@ -11,13 +11,17 @@
import isaaclab.sim as sim_utils
from isaaclab.controllers.differential_ik_cfg import DifferentialIKControllerCfg
from isaaclab.envs.mdp.actions.actions_cfg import DifferentialInverseKinematicsActionCfg
from isaaclab.managers import EventTermCfg as EventTerm
from isaaclab.managers import ObservationGroupCfg as ObsGroup
from isaaclab.managers import ObservationTermCfg as ObsTerm
from isaaclab.managers import SceneEntityCfg
from isaaclab.sensors import CameraCfg
from isaaclab.utils import configclass
from isaaclab.utils.assets import ISAAC_NUCLEUS_DIR, NVIDIA_NUCLEUS_DIR
from isaaclab_tasks.manager_based.manipulation.stack import mdp
from isaaclab_tasks.manager_based.manipulation.stack.mdp import franka_stack_events
from ... import mdp
from . import stack_joint_pos_env_cfg
##
......@@ -26,6 +30,84 @@ from . import stack_joint_pos_env_cfg
from isaaclab_assets.robots.franka import FRANKA_PANDA_HIGH_PD_CFG # isort: skip
@configclass
class EventCfg(stack_joint_pos_env_cfg.EventCfg):
"""Configuration for events."""
randomize_light = EventTerm(
func=franka_stack_events.randomize_scene_lighting_domelight,
mode="reset",
params={
"intensity_range": (1500.0, 10000.0),
"color_variation": 0.4,
"textures": [
f"{NVIDIA_NUCLEUS_DIR}/Assets/Skies/Cloudy/abandoned_parking_4k.hdr",
f"{NVIDIA_NUCLEUS_DIR}/Assets/Skies/Cloudy/evening_road_01_4k.hdr",
f"{NVIDIA_NUCLEUS_DIR}/Assets/Skies/Cloudy/lakeside_4k.hdr",
f"{NVIDIA_NUCLEUS_DIR}/Assets/Skies/Indoor/autoshop_01_4k.hdr",
f"{NVIDIA_NUCLEUS_DIR}/Assets/Skies/Indoor/carpentry_shop_01_4k.hdr",
f"{NVIDIA_NUCLEUS_DIR}/Assets/Skies/Indoor/hospital_room_4k.hdr",
f"{NVIDIA_NUCLEUS_DIR}/Assets/Skies/Indoor/hotel_room_4k.hdr",
f"{NVIDIA_NUCLEUS_DIR}/Assets/Skies/Indoor/old_bus_depot_4k.hdr",
f"{NVIDIA_NUCLEUS_DIR}/Assets/Skies/Indoor/small_empty_house_4k.hdr",
f"{NVIDIA_NUCLEUS_DIR}/Assets/Skies/Indoor/surgery_4k.hdr",
f"{NVIDIA_NUCLEUS_DIR}/Assets/Skies/Studio/photo_studio_01_4k.hdr",
],
"default_intensity": 3000.0,
"default_color": (0.75, 0.75, 0.75),
"default_texture": "",
},
)
randomize_table_visual_material = EventTerm(
func=franka_stack_events.randomize_visual_texture_material,
mode="reset",
params={
"asset_cfg": SceneEntityCfg("table"),
"textures": [
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Ash/Ash_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Bamboo_Planks/Bamboo_Planks_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Birch/Birch_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Cherry/Cherry_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Mahogany_Planks/Mahogany_Planks_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Oak/Oak_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Plywood/Plywood_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Timber/Timber_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Timber_Cladding/Timber_Cladding_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Wood/Walnut_Planks/Walnut_Planks_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Stone/Marble/Marble_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Steel_Stainless/Steel_Stainless_BaseColor.png",
],
"default_texture": (
f"{ISAAC_NUCLEUS_DIR}/Props/Mounts/SeattleLabTable/Materials/Textures/DemoTable_TableBase_BaseColor.png"
),
},
)
randomize_robot_arm_visual_texture = EventTerm(
func=franka_stack_events.randomize_visual_texture_material,
mode="reset",
params={
"asset_cfg": SceneEntityCfg("robot"),
"textures": [
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Aluminum_Cast/Aluminum_Cast_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Aluminum_Polished/Aluminum_Polished_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Brass/Brass_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Bronze/Bronze_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Brushed_Antique_Copper/Brushed_Antique_Copper_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Cast_Metal_Silver_Vein/Cast_Metal_Silver_Vein_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Copper/Copper_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Gold/Gold_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Iron/Iron_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/RustedMetal/RustedMetal_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Silver/Silver_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Steel_Carbon/Steel_Carbon_BaseColor.png",
f"{NVIDIA_NUCLEUS_DIR}/Materials/Base/Metals/Steel_Stainless/Steel_Stainless_BaseColor.png",
],
},
)
@configclass
class ObservationsCfg:
"""Observation specifications for the MDP."""
......@@ -96,13 +178,21 @@ class ObservationsCfg:
class FrankaCubeStackVisuomotorEnvCfg(stack_joint_pos_env_cfg.FrankaCubeStackEnvCfg):
observations: ObservationsCfg = ObservationsCfg()
# Evaluation settings
eval_mode = False
eval_type = None
def __post_init__(self):
# post init of parent
super().__post_init__()
# Set events
self.events = EventCfg()
# Set Franka as robot
# We switch here to a stiffer PD controller for IK tracking to be better.
self.scene.robot = FRANKA_PANDA_HIGH_PD_CFG.replace(prim_path="{ENV_REGEX_NS}/Robot")
self.scene.robot.spawn.semantic_tags = [("class", "robot")]
# Set actions for the specific robot type (franka)
self.actions.arm_action = DifferentialInverseKinematicsActionCfg(
......
......@@ -11,6 +11,8 @@ import random
import torch
from typing import TYPE_CHECKING
from isaacsim.core.utils.extensions import enable_extension
import isaaclab.utils.math as math_utils
from isaaclab.assets import Articulation, AssetBase
from isaaclab.managers import SceneEntityCfg
......@@ -57,21 +59,75 @@ def randomize_joint_by_gaussian_offset(
asset.write_joint_state_to_sim(joint_pos, joint_vel, env_ids=env_ids)
def sample_random_color(base=(0.75, 0.75, 0.75), variation=0.1):
"""
Generates a randomized color that stays close to the base color while preserving overall brightness.
The relative balance between the R, G, and B components is maintained by ensuring that
the sum of random offsets is zero.
Parameters:
base (tuple): The base RGB color with each component between 0 and 1.
variation (float): Maximum deviation to sample for each channel before balancing.
Returns:
tuple: A new RGB color with balanced random variation.
"""
# Generate random offsets for each channel in the range [-variation, variation]
offsets = [random.uniform(-variation, variation) for _ in range(3)]
# Compute the average offset
avg_offset = sum(offsets) / 3
# Adjust offsets so their sum is zero (maintaining brightness)
balanced_offsets = [offset - avg_offset for offset in offsets]
# Apply the balanced offsets to the base color and clamp each channel between 0 and 1
new_color = tuple(max(0, min(1, base_component + offset)) for base_component, offset in zip(base, balanced_offsets))
return new_color
def randomize_scene_lighting_domelight(
env: ManagerBasedEnv,
env_ids: torch.Tensor,
intensity_range: tuple[float, float],
color_variation: float,
textures: list[str],
default_intensity: float = 3000.0,
default_color: tuple[float, float, float] = (0.75, 0.75, 0.75),
default_texture: str = "",
asset_cfg: SceneEntityCfg = SceneEntityCfg("light"),
):
asset: AssetBase = env.scene[asset_cfg.name]
light_prim = asset.prims[0]
# Sample new light intensity
new_intensity = random.uniform(intensity_range[0], intensity_range[1])
# Set light intensity to light prim
intensity_attr = light_prim.GetAttribute("inputs:intensity")
intensity_attr.Set(new_intensity)
intensity_attr.Set(default_intensity)
color_attr = light_prim.GetAttribute("inputs:color")
color_attr.Set(default_color)
texture_file_attr = light_prim.GetAttribute("inputs:texture:file")
texture_file_attr.Set(default_texture)
if not hasattr(env.cfg, "eval_mode") or not env.cfg.eval_mode:
return
if env.cfg.eval_type in ["light_intensity", "all"]:
# Sample new light intensity
new_intensity = random.uniform(intensity_range[0], intensity_range[1])
# Set light intensity to light prim
intensity_attr.Set(new_intensity)
if env.cfg.eval_type in ["light_color", "all"]:
# Sample new light color
new_color = sample_random_color(base=default_color, variation=color_variation)
# Set light color to light prim
color_attr.Set(new_color)
if env.cfg.eval_type in ["light_texture", "all"]:
# Sample new light texture (background)
new_texture = random.sample(textures, 1)[0]
# Set light texture to light prim
texture_file_attr.Set(new_texture)
def sample_object_poses(
......@@ -184,3 +240,75 @@ def randomize_rigid_objects_in_focus(
)
env.rigid_objects_in_focus.append(selected_ids)
def randomize_visual_texture_material(
env: ManagerBasedEnv,
env_ids: torch.Tensor,
asset_cfg: SceneEntityCfg,
textures: list[str],
default_texture: str = "",
texture_rotation: tuple[float, float] = (0.0, 0.0),
):
"""Randomize the visual texture of bodies on an asset using Replicator API.
This function randomizes the visual texture of the bodies of the asset using the Replicator API.
The function samples random textures from the given texture paths and applies them to the bodies
of the asset. The textures are projected onto the bodies and rotated by the given angles.
.. note::
The function assumes that the asset follows the prim naming convention as:
"{asset_prim_path}/{body_name}/visuals" where the body name is the name of the body to
which the texture is applied. This is the default prim ordering when importing assets
from the asset converters in Isaac Lab.
.. note::
When randomizing the texture of individual assets, please make sure to set
:attr:`isaaclab.scene.InteractiveSceneCfg.replicate_physics` to False. This ensures that physics
parser will parse the individual asset properties separately.
"""
if hasattr(env.cfg, "eval_mode") and (
not env.cfg.eval_mode or env.cfg.eval_type not in [f"{asset_cfg.name}_texture", "all"]
):
return
# textures = [default_texture]
# enable replicator extension if not already enabled
enable_extension("omni.replicator.core")
# we import the module here since we may not always need the replicator
import omni.replicator.core as rep
# check to make sure replicate_physics is set to False, else raise error
# note: We add an explicit check here since texture randomization can happen outside of 'prestartup' mode
# and the event manager doesn't check in that case.
if env.cfg.scene.replicate_physics:
raise RuntimeError(
"Unable to randomize visual texture material with scene replication enabled."
" For stable USD-level randomization, please disable scene replication"
" by setting 'replicate_physics' to False in 'InteractiveSceneCfg'."
)
# convert from radians to degrees
texture_rotation = tuple(math.degrees(angle) for angle in texture_rotation)
# obtain the asset entity
asset = env.scene[asset_cfg.name]
# join all bodies in the asset
body_names = asset_cfg.body_names
if isinstance(body_names, str):
body_names_regex = body_names
elif isinstance(body_names, list):
body_names_regex = "|".join(body_names)
else:
body_names_regex = ".*"
if not hasattr(asset, "cfg"):
prims_group = rep.get.prims(path_pattern=f"{asset.prim_paths[0]}/visuals")
else:
prims_group = rep.get.prims(path_pattern=f"{asset.cfg.prim_path}/{body_names_regex}/visuals")
with prims_group:
rep.randomizer.texture(
textures=textures, project_uvw=True, texture_rotate=rep.distribution.uniform(*texture_rotation)
)
......@@ -27,7 +27,7 @@ def cubes_stacked(
cube_1_cfg: SceneEntityCfg = SceneEntityCfg("cube_1"),
cube_2_cfg: SceneEntityCfg = SceneEntityCfg("cube_2"),
cube_3_cfg: SceneEntityCfg = SceneEntityCfg("cube_3"),
xy_threshold: float = 0.05,
xy_threshold: float = 0.04,
height_threshold: float = 0.005,
height_diff: float = 0.0468,
gripper_open_val: torch.tensor = torch.tensor([0.04]),
......@@ -53,7 +53,9 @@ def cubes_stacked(
# Check cube positions
stacked = torch.logical_and(xy_dist_c12 < xy_threshold, xy_dist_c23 < xy_threshold)
stacked = torch.logical_and(h_dist_c12 - height_diff < height_threshold, stacked)
stacked = torch.logical_and(pos_diff_c12[:, 2] < 0.0, stacked)
stacked = torch.logical_and(h_dist_c23 - height_diff < height_threshold, stacked)
stacked = torch.logical_and(pos_diff_c23[:, 2] < 0.0, stacked)
# Check gripper positions
stacked = torch.logical_and(
......
......@@ -69,6 +69,7 @@ def test_environments(task_name, num_envs, device):
"Isaac-Stack-Cube-Instance-Randomize-Franka-IK-Rel-v0",
"Isaac-Stack-Cube-Instance-Randomize-Franka-v0",
"Isaac-Stack-Cube-Franka-IK-Rel-Visuomotor-v0",
"Isaac-Stack-Cube-Franka-IK-Rel-Visuomotor-Cosmos-v0",
]:
return
# skip automate environments as they require cuda installation
......
......@@ -126,24 +126,28 @@ def pytest_sessionstart(session):
"""Intercept pytest startup to execute tests in the correct order."""
# Get the workspace root directory (one level up from tools)
workspace_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
source_dir = os.path.join(workspace_root, "source")
source_dirs = [
os.path.join(workspace_root, "scripts"),
os.path.join(workspace_root, "source"),
]
if not os.path.exists(source_dir):
print(f"Error: source directory not found at {source_dir}")
pytest.exit("Source directory not found", returncode=1)
# Get all test files in the source directory
# Get all test files in the source directories
test_files = []
for root, _, files in os.walk(source_dir):
for file in files:
if file.startswith("test_") and file.endswith(".py"):
# Skip if the file is in TESTS_TO_SKIP
if file in test_settings.TESTS_TO_SKIP:
print(f"Skipping {file} as it's in the skip list")
continue
full_path = os.path.join(root, file)
test_files.append(full_path)
for source_dir in source_dirs:
if not os.path.exists(source_dir):
print(f"Error: source directory not found at {source_dir}")
pytest.exit("Source directory not found", returncode=1)
for root, _, files in os.walk(source_dir):
for file in files:
if file.startswith("test_") and file.endswith(".py"):
# Skip if the file is in TESTS_TO_SKIP
if file in test_settings.TESTS_TO_SKIP:
print(f"Skipping {file} as it's in the skip list")
continue
full_path = os.path.join(root, file)
test_files.append(full_path)
if not test_files:
print("No test files found in source directory")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment