Commit 311d76e2 authored by chengronglai's avatar chengronglai Committed by Kelly Guo

Adds task instruction UI support for mimic (#269)

This enhancement introduces a dynamic text instruction widget that
provides real-time feedback on the number of successful recordings
during demonstration sessions.

- New feature (non-breaking change which adds functionality)

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./isaaclab.sh --format`
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [x] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there

---------
Co-authored-by: 's avatarKelly Guo <kellyguo123@hotmail.com>
parent 1566a3ca
......@@ -85,15 +85,21 @@ if "handtracking" in args_cli.teleop_device.lower():
# Omniverse logger
import omni.log
import omni.ui as ui
# Additional Isaac Lab imports that can only be imported after the simulator is running
from isaaclab.devices import OpenXRDevice, Se3Keyboard, Se3SpaceMouse
import isaaclab_mimic.envs # noqa: F401
from isaaclab_mimic.ui.instruction_display import InstructionDisplay, show_subtask_instructions
if args_cli.enable_pinocchio:
from isaaclab.devices.openxr.retargeters.humanoid.fourier.gr1t2_retargeter import GR1T2Retargeter
import isaaclab_tasks.manager_based.manipulation.pick_place # noqa: F401
from isaaclab.devices.openxr.retargeters.manipulator import GripperRetargeter, Se3AbsRetargeter, Se3RelRetargeter
from isaaclab.envs.mdp.recorders.recorders_cfg import ActionStateRecorderManagerCfg
from isaaclab.envs.ui import EmptyWindow
from isaaclab.managers import DatasetExportMode
import isaaclab_tasks # noqa: F401
......@@ -351,6 +357,19 @@ def main():
# simulate environment -- run everything in inference mode
current_recorded_demo_count = 0
success_step_count = 0
label_text = f"Recorded {current_recorded_demo_count} successful demonstrations."
instruction_display = InstructionDisplay(args_cli.teleop_device)
if args_cli.teleop_device.lower() != "handtracking":
window = EmptyWindow(env, "Instruction")
with window.ui_window_elements["main_vstack"]:
demo_label = ui.Label(label_text)
subtask_label = ui.Label("")
instruction_display.set_labels(subtask_label, demo_label)
subtasks = {}
with contextlib.suppress(KeyboardInterrupt) and torch.inference_mode():
while simulation_app.is_running():
# get data from teleop device
......@@ -360,7 +379,12 @@ def main():
if running_recording_instance:
# compute actions based on environment
actions = pre_process_actions(teleop_data, env.num_envs, env.device)
env.step(actions)
obv = env.step(actions)
if subtasks is not None:
if subtasks == {}:
subtasks = obv[0].get("subtask_terms")
elif subtasks:
show_subtask_instructions(instruction_display, subtasks, obv, env.cfg)
else:
env.sim.render()
......@@ -377,17 +401,19 @@ def main():
else:
success_step_count = 0
# print out the current demo count if it has changed
if env.recorder_manager.exported_successful_episode_count > current_recorded_demo_count:
current_recorded_demo_count = env.recorder_manager.exported_successful_episode_count
label_text = f"Recorded {current_recorded_demo_count} successful demonstrations."
print(label_text)
if should_reset_recording_instance:
env.sim.reset()
env.recorder_manager.reset()
env.reset()
should_reset_recording_instance = False
success_step_count = 0
# print out the current demo count if it has changed
if env.recorder_manager.exported_successful_episode_count > current_recorded_demo_count:
current_recorded_demo_count = env.recorder_manager.exported_successful_episode_count
print(f"Recorded {current_recorded_demo_count} successful demonstrations.")
instruction_display.show_demo(label_text)
if args_cli.num_demos > 0 and env.recorder_manager.exported_successful_episode_count >= args_cli.num_demos:
print(f"All {args_cli.num_demos} demonstrations recorded. Exiting the app.")
......
[package]
# Note: Semantic Versioning is used: https://semver.org/
version = "0.36.19"
version = "0.36.20"
# Description
title = "Isaac Lab framework for Robot Learning"
......
Changelog
---------
0.36.19 (2025-04-09)
0.36.20 (2025-04-09)
~~~~~~~~~~~~~~~~~~~~
Changed
......@@ -12,7 +12,7 @@ Changed
the cuda device, which results in NCCL errors on distributed setups.
0.36.18 (2025-04-01)
0.36.19 (2025-04-01)
~~~~~~~~~~~~~~~~~~~~
Fixed
......@@ -21,6 +21,16 @@ Fixed
* Added check in RecorderManager to ensure that the success indicator is only set if the termination manager is present.
0.36.18 (2025-03-26)
~~~~~~~~~~~~~~~~~~~~
Added
^^^^^
* Added a dynamic text instruction widget that provides real-time feedback
on the number of successful recordings during demonstration sessions.
0.36.17 (2025-03-26)
~~~~~~~~~~~~~~~~~~~~
......
......@@ -131,6 +131,12 @@ class SubTaskConfig:
apply_noise_during_interpolation: bool = False
"""Whether to apply noise during interpolation."""
description: str = ""
"""Description of the subtask"""
next_subtask_description: str = ""
"""Instructions for the next subtask"""
class SubTaskConstraintType(enum.IntEnum):
"""Enum for subtask constraint types."""
......
......@@ -11,5 +11,6 @@ toggling different debug visualization tools, and other user-defined functionali
"""
from .base_env_window import BaseEnvWindow
from .empty_window import EmptyWindow
from .manager_based_rl_env_window import ManagerBasedRLEnvWindow
from .viewport_camera_controller import ViewportCameraController
# Copyright (c) 2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
import omni.kit.app
if TYPE_CHECKING:
import omni.ui
from ..manager_based_env import ManagerBasedEnv
class EmptyWindow:
"""
Creates an empty UI window that can be docked in the Omniverse Kit environment.
The class initializes a dockable UI window and provides a main frame with a vertical stack.
You can add custom UI elements to this vertical stack.
Example for adding a UI element from the standalone execution script:
>>> with env.window.ui_window_elements["main_vstack"]:
>>> ui.Label("My UI element")
"""
def __init__(self, env: ManagerBasedEnv, window_name: str):
"""Initialize the window.
Args:
env: The environment object.
window_name: The name of the window.
"""
# store environment
self.env = env
# create window for UI
self.ui_window = omni.ui.Window(
window_name, width=400, height=500, visible=True, dock_preference=omni.ui.DockPreference.RIGHT_TOP
)
# dock next to properties window
asyncio.ensure_future(self._dock_window(window_title=self.ui_window.title))
# keep a dictionary of stacks so that child environments can add their own UI elements
# this can be done by using the `with` context manager
self.ui_window_elements = dict()
# create main frame
self.ui_window_elements["main_frame"] = self.ui_window.frame
with self.ui_window_elements["main_frame"]:
# create main vstack
self.ui_window_elements["main_vstack"] = omni.ui.VStack(spacing=5, height=0)
def __del__(self):
"""Destructor for the window."""
# destroy the window
if self.ui_window is not None:
self.ui_window.visible = False
self.ui_window.destroy()
self.ui_window = None
async def _dock_window(self, window_title: str):
"""Docks the custom UI window to the property window."""
# wait for the window to be created
for _ in range(5):
if omni.ui.Workspace.get_window(window_title):
break
await self.env.sim.app.next_update_async()
# dock next to properties window
custom_window = omni.ui.Workspace.get_window(window_title)
property_window = omni.ui.Workspace.get_window("Property")
if custom_window and property_window:
custom_window.dock_in(property_window, omni.ui.DockPosition.SAME, 1.0)
custom_window.focus()
# Copyright (c) 2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from .instruction_widget import SimpleTextWidget, show_instruction
# Copyright (c) 2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import asyncio
import functools
import textwrap
from typing import Any, TypeAlias
import omni.kit.commands
import omni.ui as ui
from isaacsim.core.utils.prims import delete_prim, get_prim_at_path
from omni.kit.xr.scene_view.utils import UiContainer, WidgetComponent
from omni.kit.xr.scene_view.utils.spatial_source import SpatialSource
from pxr import Gf
Vec3Type: TypeAlias = Gf.Vec3f | Gf.Vec3d
camera_facing_widget_container = {}
camera_facing_widget_timers = {}
class SimpleTextWidget(ui.Widget):
def __init__(self, text: str | None = "Simple Text", style: dict[str, Any] | None = None, **kwargs):
super().__init__(**kwargs)
if style is None:
style = {"font_size": 1, "color": 0xFFFFFFFF}
self._text = text
self._style = style
self._ui_label = None
self._build_ui()
def set_label_text(self, text: str):
"""Update the text displayed by the label."""
self._text = text
if self._ui_label:
self._ui_label.text = self._text
def _build_ui(self):
"""Build the UI with a window-like rectangle and centered label."""
with ui.ZStack():
ui.Rectangle(style={"Rectangle": {"background_color": 0xFF454545, "border_radius": 0.1}})
with ui.VStack(alignment=ui.Alignment.CENTER):
self._ui_label = ui.Label(self._text, style=self._style, alignment=ui.Alignment.CENTER)
def compute_widget_dimensions(
text: str, font_size: float, max_width: float, min_width: float
) -> tuple[float, float, list[str]]:
"""
Estimate widget dimensions based on text content.
Returns:
actual_width (float): The width, clamped between min_width and max_width.
actual_height (float): The computed height based on wrapped text lines.
lines (List[str]): The list of wrapped text lines.
"""
# Estimate average character width.
char_width = 0.6 * font_size
max_chars_per_line = int(max_width / char_width)
lines = textwrap.wrap(text, width=max_chars_per_line)
if not lines:
lines = [text]
computed_width = max(len(line) for line in lines) * char_width
actual_width = max(min(computed_width, max_width), min_width)
line_height = 1.2 * font_size
actual_height = len(lines) * line_height
return actual_width, actual_height, lines
def show_instruction(
text: str,
prim_path_source: str | None = None,
translation: Gf.Vec3d = Gf.Vec3d(0, 0, 0),
display_duration: float | None = 5.0,
max_width: float = 2.5,
min_width: float = 1.0, # Prevent widget from being too narrow.
font_size: float = 0.1,
target_prim_path: str = "/newPrim",
) -> UiContainer | None:
"""
Create and display the instruction widget based on the given text.
The widget's width and height are computed dynamically based on the input text.
It automatically wraps text that is too long and adjusts the widget's height
accordingly. If a display duration is provided (non-zero), the widget is automatically
hidden after that many seconds.
Args:
text (str): The instruction text to display.
prim_path_source (Optional[str]): The prim path to be used as a spatial sourcey
for the widget.
translation (Gf.Vec3d): A translation vector specifying the widget's position.
display_duration (Optional[float]): The time in seconds to display the widget before
automatically hiding it. If None or 0, the widget remains visible until manually
hidden.
target_prim_path (str): The target path where the copied prim will be created.
Defaults to "/newPrim".
Returns:
UiContainer: The container instance holding the instruction widget.
"""
global camera_facing_widget_container, camera_facing_widget_timers
# Check if widget exists and has different text
if target_prim_path in camera_facing_widget_container:
container, current_text = camera_facing_widget_container[target_prim_path]
if current_text == text:
return container
# Cancel existing timer if there is one
if target_prim_path in camera_facing_widget_timers:
camera_facing_widget_timers[target_prim_path].cancel()
del camera_facing_widget_timers[target_prim_path]
container.root.clear()
del camera_facing_widget_container[target_prim_path]
# Clean up existing widget
if get_prim_at_path(target_prim_path):
delete_prim(target_prim_path)
# Compute dimensions and wrap text.
width, height, lines = compute_widget_dimensions(text, font_size, max_width, min_width)
wrapped_text = "\n".join(lines)
# Create the widget component.
widget_component = WidgetComponent(
SimpleTextWidget,
width=width,
height=height,
resolution_scale=300,
widget_args=[wrapped_text, {"font_size": font_size}],
)
copied_prim = omni.kit.commands.execute(
"CopyPrim",
path_from=prim_path_source,
path_to=target_prim_path,
exclusive_select=False,
copy_to_introducing_layer=False,
)
space_stack = []
if copied_prim is not None:
space_stack.append(SpatialSource.new_prim_path_source(target_prim_path))
space_stack.extend([
SpatialSource.new_translation_source(translation),
SpatialSource.new_look_at_camera_source(),
])
# Create the UI container with the widget.
container = UiContainer(
widget_component,
space_stack=space_stack,
)
camera_facing_widget_container[target_prim_path] = (container, text)
# Schedule auto-hide after the specified display_duration if provided.
if display_duration:
timer = asyncio.get_event_loop().call_later(display_duration, functools.partial(hide, target_prim_path))
camera_facing_widget_timers[target_prim_path] = timer
return container
def hide(target_prim_path: str = "/newPrim") -> None:
"""
Hide and clean up a specific instruction widget.
Also cleans up associated timer.
"""
global camera_facing_widget_container, camera_facing_widget_timers
if target_prim_path in camera_facing_widget_container:
container, _ = camera_facing_widget_container[target_prim_path]
container.root.clear()
del camera_facing_widget_container[target_prim_path]
if target_prim_path in camera_facing_widget_timers:
del camera_facing_widget_timers[target_prim_path]
......@@ -58,6 +58,8 @@ class FrankaCubeStackIKRelMimicEnvCfg(FrankaCubeStackEnvCfg, MimicEnvCfg):
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
description="Grasp red cube",
next_subtask_description="Stack red cube on top of blue cube",
)
)
subtask_configs.append(
......@@ -80,6 +82,7 @@ class FrankaCubeStackIKRelMimicEnvCfg(FrankaCubeStackEnvCfg, MimicEnvCfg):
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
next_subtask_description="Grasp green cube",
)
)
subtask_configs.append(
......@@ -102,6 +105,7 @@ class FrankaCubeStackIKRelMimicEnvCfg(FrankaCubeStackEnvCfg, MimicEnvCfg):
num_fixed_steps=0,
# If True, apply action noise during the interpolation phase and execution
apply_noise_during_interpolation=False,
next_subtask_description="Stack green cube on top of red cube",
)
)
subtask_configs.append(
......
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
# Copyright (c) 2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Module for handling instruction displays in Isaac Lab environments."""
from typing import Any
from pxr import Gf
from isaaclab.envs.mimic_env_cfg import MimicEnvCfg
class InstructionDisplay:
"""Handles instruction display for different teleop devices."""
def __init__(self, teleop_device):
self.teleop_device = teleop_device.lower()
if self.teleop_device == "handtracking":
from isaaclab.ui.xr_widgets import show_instruction
self._display_subtask = lambda text: show_instruction(
text, "/_xr/stage/xrCamera", Gf.Vec3f(1.25, 0.3, -2), target_prim_path="/subtask_instruction"
)
self._display_demo = lambda text: show_instruction(
text, "/_xr/stage/xrCamera", Gf.Vec3f(-1.25, 0.3, -2), target_prim_path="/demo_complete"
)
else:
self.subtask_label = None
self.demo_label = None
self._display_subtask = lambda text: setattr(self.subtask_label, "text", text)
self._display_demo = lambda text: setattr(self.demo_label, "text", text)
def set_labels(self, subtask_label, demo_label):
"""Set the instruction labels for non-handtracking displays."""
self.subtask_label = subtask_label
self.demo_label = demo_label
def show_subtask(self, text):
"""Display subtask instruction."""
self._display_subtask(text)
def show_demo(self, text):
"""Display demo completion message."""
self._display_demo(text)
def show_subtask_instructions(
instruction_display: InstructionDisplay, prev_subtasks: dict, obv: dict, env_cfg: Any
) -> None:
"""
Detect changes in subtasks and display the changes.
Args:
instruction_display: Display handler for showing instructions
prev_subtasks: Previous subtask terms
obv: Current observation with subtask terms
env_cfg: Environment configuration containing subtask descriptions
"""
if not isinstance(env_cfg, MimicEnvCfg):
return
subtasks = obv[0].get("subtask_terms")
if subtasks is None:
return
# Currently only supports one end effector
eef_name = list(env_cfg.subtask_configs.keys())[0]
subtask_configs = env_cfg.subtask_configs[eef_name]
all_false = True
for subtask_config in subtask_configs:
term_signal = subtask_config.subtask_term_signal
if term_signal is None:
continue
current_state = subtasks[term_signal].item()
prev_state = prev_subtasks.get(term_signal, False)
if current_state:
all_false = False
# Show message when state changes from False to True
if current_state and not prev_state:
instruction_display.show_subtask(f"Next objective: {subtask_config.next_subtask_description}")
# Update the previous state
prev_subtasks[term_signal] = current_state
# If all tasks are false, show the first task's description
if all_false and subtask_configs:
first_task = subtask_configs[0]
instruction_display.show_subtask(f"Current objective: {first_task.description}")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment