Unverified Commit e4b5681e authored by lotusl-code's avatar lotusl-code Committed by GitHub

Adds notification widgets at IK error status and Teleop task completion (#3356)

# Description

1. Add a notification widget when ik error happens
2. At the end of Teleop data collection, display a notification before
the application termination


<!--
Thank you for your interest in sending a pull request. Please make sure
to check the contribution guidelines.

Link:
https://isaac-sim.github.io/IsaacLab/main/source/refs/contributing.html
-->

Please include a summary of the change and which issue is fixed. Please
also include relevant motivation and context.
List any dependencies that are required for this change.

Fixes # (issue)

<!-- As a practice, it is recommended to open an issue to have
discussions on the proposed pull request.
This makes it easier for the community to keep track of what is being
developed or added, and if a given feature
is demanded by more than one party. -->

## Type of change

<!-- As you go through the list, delete the ones that are not
applicable. -->

- Bug fix (non-breaking change which fixes an issue)
- New feature (non-breaking change which adds functionality)
- Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- This change requires a documentation update

## Screenshots

Please attach before and after screenshots of the change if applicable.

<!--
Example:

| Before | After |
| ------ | ----- |
| _gif/png before_ | _gif/png after_ |

To upload images to a PR -- simply drag and drop an image while in edit
mode and it should upload the image directly. You can then paste that
source into the above before/after sections.
-->

## Checklist

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./isaaclab.sh --format`
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [x] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there

<!--
As you go through the checklist above, you can mark something as done by
putting an x character in it

For example,
- [x] I have done this task
- [ ] I have not done this task
-->

---------
Signed-off-by: 's avatarKelly Guo <kellyg@nvidia.com>
Co-authored-by: 's avatarKelly Guo <kellyg@nvidia.com>
parent 9d194dc5
......@@ -88,6 +88,7 @@ Guidelines for modifications:
* Kourosh Darvish
* Kousheek Chakraborty
* Lionel Gulich
* Lotus Li
* Louis Le Lay
* Lorenz Wellhausen
* Lukas Fröhlich
......
......@@ -382,6 +382,18 @@ Back on your Apple Vision Pro:
motion of the dots and the robot may be caused by the limits of the robot joints and/or robot
controller.
.. note::
When the inverse kinematics solver fails to find a valid solution, an error message will appear
in the XR device display. To recover from this state, click the **Reset** button to return
the robot to its original pose and continue teleoperation.
.. figure:: ../_static/setup/cloudxr_avp_ik_error.jpg
:align: center
:figwidth: 80%
:alt: IK Error Message Display in XR Device
#. When you are finished with the example, click **Disconnect** to disconnect from Isaac Lab.
.. admonition:: Learn More about Teleoperation and Imitation Learning in Isaac Lab
......
......@@ -469,16 +469,24 @@ def run_simulation_loop(
label_text = f"Recorded {current_recorded_demo_count} successful demonstrations."
print(label_text)
# Check if we've reached the desired number of demos
if args_cli.num_demos > 0 and env.recorder_manager.exported_successful_episode_count >= args_cli.num_demos:
label_text = f"All {current_recorded_demo_count} demonstrations recorded.\nExiting the app."
instruction_display.show_demo(label_text)
print(label_text)
target_time = time.time() + 0.8
while time.time() < target_time:
if rate_limiter:
rate_limiter.sleep(env)
else:
env.sim.render()
break
# Handle reset if requested
if should_reset_recording_instance:
success_step_count = handle_reset(env, success_step_count, instruction_display, label_text)
should_reset_recording_instance = False
# Check if we've reached the desired number of demos
if args_cli.num_demos > 0 and env.recorder_manager.exported_successful_episode_count >= args_cli.num_demos:
print(f"All {args_cli.num_demos} demonstrations recorded. Exiting the app.")
break
# Check if simulation is stopped
if env.sim.is_stopped():
break
......@@ -506,6 +514,10 @@ def main() -> None:
# if handtracking is selected, rate limiting is achieved via OpenXR
if args_cli.xr:
rate_limiter = None
from isaaclab.ui.xr_widgets import TeleopVisualizationManager, XRVisualization
# Assign the teleop visualization manager to the visualization system
XRVisualization.assign_manager(TeleopVisualizationManager)
else:
rate_limiter = RateLimiter(args_cli.step_hz)
......
[package]
# Note: Semantic Versioning is used: https://semver.org/
version = "0.45.13"
version = "0.45.14"
# Description
title = "Isaac Lab framework for Robot Learning"
......
Changelog
---------
0.45.14 (2025-09-08)
~~~~~~~~~~~~~~~~~~~~
Added
^^^^^
* * Added :class:`~isaaclab.ui.xr_widgets.TeleopVisualizationManager` and :class:`~isaaclab.ui.xr_widgets.XRVisualization`
classes to provide real-time visualization of teleoperation and inverse kinematics status in XR environments.
0.45.13 (2025-09-08)
~~~~~~~~~~~~~~~~~~~~
......
......@@ -173,6 +173,11 @@ class PinkIKController:
"Warning: IK quadratic solver could not find a solution! Did not update the target joint"
f" positions.\nError: {e}"
)
if self.cfg.xr_enabled:
from isaaclab.ui.xr_widgets import XRVisualization
XRVisualization.push_event("ik_error", {"error": e})
return torch.tensor(curr_joint_pos, device=self.device, dtype=torch.float32)
# Discard the first 6 values (for root and universal joints)
......
......@@ -62,3 +62,6 @@ class PinkIKControllerCfg:
"""If True, the Pink IK solver will fail and raise an error if any joint limit is violated during optimization. PinkIKController
will handle the error by setting the last joint positions. If False, the solver will ignore joint limit violations and return the
closest solution found."""
xr_enabled: bool = False
"""If True, the Pink IK controller will send information to the XRVisualization."""
......@@ -2,4 +2,6 @@
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from .instruction_widget import SimpleTextWidget, show_instruction
from .instruction_widget import hide_instruction, show_instruction, update_instruction
from .scene_visualization import DataCollector, TriggerType, VisualizationManager, XRVisualization
from .teleop_visualization_manager import TeleopVisualizationManager
......@@ -22,21 +22,63 @@ camera_facing_widget_timers = {}
class SimpleTextWidget(ui.Widget):
def __init__(self, text: str | None = "Simple Text", style: dict[str, Any] | None = None, **kwargs):
"""A rectangular text label widget for XR overlays.
The widget renders a centered label over a rectangular background. It keeps
track of the configured style and an original width value used by
higher-level helpers to update the text.
"""
def __init__(
self,
text: str | None = "Simple Text",
style: dict[str, Any] | None = None,
original_width: float = 0.0,
**kwargs
):
"""Initialize the text widget.
Args:
text (str): Initial text to display.
style (dict[str, Any]): Optional style dictionary (for example: ``{"font_size": 1, "color": 0xFFFFFFFF}``).
original_width (float): Width used when updating the text.
**kwargs: Additional keyword arguments forwarded to ``ui.Widget``.
"""
super().__init__(**kwargs)
if style is None:
style = {"font_size": 1, "color": 0xFFFFFFFF}
self._text = text
self._style = style
self._ui_label = None
self._original_width = original_width
self._build_ui()
def set_label_text(self, text: str):
"""Update the text displayed by the label."""
"""Update the text displayed by the label.
Args:
text (str): New label text to display.
"""
self._text = text
if self._ui_label:
self._ui_label.text = self._text
def get_font_size(self):
"""Return the configured font size.
Returns:
float: Font size value.
"""
return self._style.get("font_size", 1)
def get_width(self):
"""Return the width used when updating the text.
Returns:
float: Width used when updating the text.
"""
return self._original_width
def _build_ui(self):
"""Build the UI with a window-like rectangle and centered label."""
with ui.ZStack():
......@@ -47,14 +89,20 @@ class SimpleTextWidget(ui.Widget):
def compute_widget_dimensions(
text: str, font_size: float, max_width: float, min_width: float
) -> tuple[float, float, list[str]]:
"""
Estimate widget dimensions based on text content.
) -> tuple[float, float, str]:
"""Estimate widget width/height and wrap the text.
Args:
text (str): Raw text to render.
font_size (float): Font size used for estimating character metrics.
max_width (float): Maximum allowed widget width.
min_width (float): Minimum allowed widget width.
Returns:
actual_width (float): The width, clamped between min_width and max_width.
actual_height (float): The computed height based on wrapped text lines.
lines (List[str]): The list of wrapped text lines.
tuple[float, float, str]: A tuple ``(width, height, wrapped_text)`` where
``width`` and ``height`` are the computed widget dimensions, and
``wrapped_text`` contains the input text broken into newline-separated
lines to fit within the width constraints.
"""
# Estimate average character width.
char_width = 0.6 * font_size
......@@ -66,7 +114,8 @@ def compute_widget_dimensions(
actual_width = max(min(computed_width, max_width), min_width)
line_height = 1.2 * font_size
actual_height = len(lines) * line_height
return actual_width, actual_height, lines
wrapped_text = "\n".join(lines)
return actual_width, actual_height, wrapped_text
def show_instruction(
......@@ -77,29 +126,29 @@ def show_instruction(
max_width: float = 2.5,
min_width: float = 1.0, # Prevent widget from being too narrow.
font_size: float = 0.1,
text_color: int = 0xFFFFFFFF,
target_prim_path: str = "/newPrim",
) -> UiContainer | None:
"""
Create and display the instruction widget based on the given text.
"""Create and display an instruction widget with the given text.
The widget's width and height are computed dynamically based on the input text.
It automatically wraps text that is too long and adjusts the widget's height
accordingly. If a display duration is provided (non-zero), the widget is automatically
hidden after that many seconds.
The widget size is computed from the text and font size, wrapping content
to respect the width limits. If ``display_duration`` is provided and
non-zero, the widget is hidden automatically after the duration elapses.
Args:
text (str): The instruction text to display.
prim_path_source (Optional[str]): The prim path to be used as a spatial sourcey
for the widget.
translation (Gf.Vec3d): A translation vector specifying the widget's position.
display_duration (Optional[float]): The time in seconds to display the widget before
automatically hiding it. If None or 0, the widget remains visible until manually
hidden.
target_prim_path (str): The target path where the copied prim will be created.
Defaults to "/newPrim".
text (str): Instruction text to display.
prim_path_source (str | None): Optional prim path used as a spatial source for the widget.
translation (Gf.Vec3d): World translation to apply to the widget.
display_duration (float | None): Seconds to keep the widget visible. If ``None`` or ``0``,
the widget remains until hidden manually.
max_width (float): Maximum widget width used for wrapping.
min_width (float): Minimum widget width used for wrapping.
font_size (float): Font size of the rendered text.
text_color (int): RGBA color encoded as a 32-bit integer.
target_prim_path (str): Prim path where the widget prim will be created/copied.
Returns:
UiContainer: The container instance holding the instruction widget.
UiContainer | None: The container that owns the instruction widget, or ``None`` if creation failed.
"""
global camera_facing_widget_container, camera_facing_widget_timers
......@@ -121,9 +170,7 @@ def show_instruction(
if get_prim_at_path(target_prim_path):
delete_prim(target_prim_path)
# Compute dimensions and wrap text.
width, height, lines = compute_widget_dimensions(text, font_size, max_width, min_width)
wrapped_text = "\n".join(lines)
width, height, wrapped_text = compute_widget_dimensions(text, font_size, max_width, min_width)
# Create the widget component.
widget_component = WidgetComponent(
......@@ -131,7 +178,7 @@ def show_instruction(
width=width,
height=height,
resolution_scale=300,
widget_args=[wrapped_text, {"font_size": font_size}],
widget_args=[wrapped_text, {"font_size": font_size, "color": text_color}, width],
)
copied_prim = omni.kit.commands.execute(
......@@ -160,17 +207,24 @@ def show_instruction(
# Schedule auto-hide after the specified display_duration if provided.
if display_duration:
timer = asyncio.get_event_loop().call_later(display_duration, functools.partial(hide, target_prim_path))
timer = asyncio.get_event_loop().call_later(
display_duration, functools.partial(hide_instruction, target_prim_path)
)
camera_facing_widget_timers[target_prim_path] = timer
return container
def hide(target_prim_path: str = "/newPrim") -> None:
"""
Hide and clean up a specific instruction widget.
Also cleans up associated timer.
def hide_instruction(target_prim_path: str = "/newPrim") -> None:
"""Hide and clean up a specific instruction widget.
Args:
target_prim_path (str): Prim path of the widget to hide.
Returns:
None: This function does not return a value.
"""
global camera_facing_widget_container, camera_facing_widget_timers
if target_prim_path in camera_facing_widget_container:
......@@ -180,3 +234,44 @@ def hide(target_prim_path: str = "/newPrim") -> None:
if target_prim_path in camera_facing_widget_timers:
del camera_facing_widget_timers[target_prim_path]
def update_instruction(target_prim_path: str = "/newPrim", text: str = ""):
"""Update the text content of an existing instruction widget.
Args:
target_prim_path (str): Prim path of the widget to update.
text (str): New text content to display.
Returns:
bool: ``True`` if the widget existed and was updated, otherwise ``False``.
"""
global camera_facing_widget_container
container_data = camera_facing_widget_container.get(target_prim_path)
if container_data:
container, current_text = container_data
# Only update if the text has actually changed
if current_text != text:
# Access the widget through the manipulator as shown in ui_container.py
manipulator = container.manipulator
# The WidgetComponent is stored in the manipulator's components
# Try to access the widget component and then the actual widget
components = getattr(manipulator, "_ComposableManipulator__components")
if len(components) > 0:
simple_text_widget = components[0]
if simple_text_widget and simple_text_widget.component and simple_text_widget.component.widget:
width, height, wrapped_text = compute_widget_dimensions(
text,
simple_text_widget.component.widget.get_font_size(),
simple_text_widget.component.widget.get_width(),
simple_text_widget.component.widget.get_width(),
)
simple_text_widget.component.widget.set_label_text(wrapped_text)
# Update the stored text in the global dictionary
camera_facing_widget_container[target_prim_path] = (container, text)
return True
return False
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import contextlib
import inspect
import numpy as np
import threading
import time
import torch
from collections.abc import Callable
from enum import Enum
from typing import Any, Union
import omni.log
from pxr import Gf
from isaaclab.sim import SimulationContext
from isaaclab.ui.xr_widgets import show_instruction
class TriggerType(Enum):
"""Enumeration of trigger types for visualization callbacks.
Defines when callbacks should be executed:
- TRIGGER_ON_EVENT: Execute when a specific event occurs
- TRIGGER_ON_PERIOD: Execute at regular time intervals
- TRIGGER_ON_CHANGE: Execute when a specific data variable changes
- TRIGGER_ON_UPDATE: Execute every frame
"""
TRIGGER_ON_EVENT = 0
TRIGGER_ON_PERIOD = 1
TRIGGER_ON_CHANGE = 2
TRIGGER_ON_UPDATE = 3
class DataCollector:
"""Collects and manages data for visualization purposes.
This class provides a centralized data store for visualization data,
with change detection and callback mechanisms for real-time updates.
"""
def __init__(self):
"""Initialize the data collector with empty data store and callback system."""
self._data: dict[str, Any] = {}
self._visualization_callback: Callable | None = None
self._changed_flags: set[str] = set()
def _values_equal(self, existing_value: Any, new_value: Any) -> bool:
"""Compare two values using appropriate method based on their types.
Handles different data types including None, NumPy arrays, PyTorch tensors,
and standard Python types for accurate change detection.
Args:
existing_value: The current value stored in the data collector
new_value: The new value to compare against
Returns:
bool: True if values are equal, False otherwise
"""
# If both are None or one is None
if existing_value is None or new_value is None:
return existing_value is new_value
# If types are different, they're not equal
if type(existing_value) is not type(new_value):
return False
# Handle NumPy arrays
if isinstance(existing_value, np.ndarray):
return np.array_equal(existing_value, new_value)
# Handle torch tensors (if they exist)
if hasattr(existing_value, "equal"):
with contextlib.suppress(Exception):
return torch.equal(existing_value, new_value)
# For all other types (int, float, string, bool, list, dict, set), use regular equality
with contextlib.suppress(Exception):
return existing_value == new_value
# If comparison fails for any reason, assume they're different
return False
def update_data(self, name: str, value: Any) -> None:
"""Update a data field and trigger change detection.
This method handles data updates with intelligent change detection.
It also performs pre-processing and post-processing based on the field name.
Args:
name: The name/key of the data field to update
value: The new value to store (None to remove the field)
"""
existing_value = self.get_data(name)
if value is None:
self._data.pop(name)
if existing_value is not None:
self._changed_flags.add(name)
return
# Todo: for list or array, the change won't be detected
# Check if the value has changed using appropriate comparison method
if self._values_equal(existing_value, value):
return
# Save it
self._data[name] = value
self._changed_flags.add(name)
def update_loop(self) -> None:
"""Process pending changes and trigger visualization callbacks.
This method should be called regularly to ensure visualization updates
are processed in a timely manner.
"""
if len(self._changed_flags) > 0:
if self._visualization_callback:
self._visualization_callback(self._changed_flags)
self._changed_flags.clear()
def get_data(self, name: str) -> Any:
"""Retrieve data by name.
Args:
name: The name/key of the data field to retrieve
Returns:
The stored value, or None if the field doesn't exist
"""
return self._data.get(name)
def set_visualization_callback(self, callback: Callable) -> None:
"""Set the VisualizationManager callback function to be called when data changes.
Args:
callback: Function to call when data changes, receives set of changed field names
"""
self._visualization_callback = callback
class VisualizationManager:
"""Base class for managing visualization rules and callbacks.
Provides a framework for registering and executing callbacks based on
different trigger conditions (events, time periods, data changes).
"""
# Type aliases for different callback signatures
StandardCallback = Callable[["VisualizationManager", "DataCollector"], None]
EventCallback = Callable[["VisualizationManager", "DataCollector", Any], None]
CallbackType = Union[StandardCallback, EventCallback]
class TimeCountdown:
"""Internal class for managing periodic timer-based callbacks."""
period: float
countdown: float
last_time: float
def __init__(self, period: float, initial_countdown: float = 0.0):
"""Initialize a countdown timer.
Args:
period: Time interval in seconds between callback executions
"""
self.period = period
self.countdown = initial_countdown
self.last_time = time.time()
def update(self, current_time: float) -> bool:
"""Update the countdown timer and check if callback should be triggered.
Args:
current_time: Current time in seconds
Returns:
bool: True if callback should be triggered, False otherwise
"""
self.countdown -= current_time - self.last_time
self.last_time = current_time
if self.countdown <= 0.0:
self.countdown = self.period
return True
return False
# Widget presets for common visualization configurations
@classmethod
def message_widget_preset(cls) -> dict[str, Any]:
"""Get the message widget preset configuration.
Returns:
dict: Configuration dictionary for message widgets
"""
return {
"prim_path_source": "/_xr/stage/xrCamera",
"translation": Gf.Vec3f(0, 0, -2),
"display_duration": 3.0,
"max_width": 2.5,
"min_width": 1.0,
"font_size": 0.1,
"text_color": 0xFF00FFFF,
}
@classmethod
def panel_widget_preset(cls) -> dict[str, Any]:
"""Get the panel widget preset configuration.
Returns:
dict: Configuration dictionary for panel widgets
"""
return {
"prim_path_source": "/XRAnchor",
"translation": Gf.Vec3f(0, 2, 2), # hard-coded temporarily
"display_duration": 0.0,
"font_size": 0.13,
"max_width": 2,
"min_width": 2,
}
def display_widget(self, text: str, name: str, args: dict[str, Any]) -> None:
"""Display a widget with the given text and configuration.
Args:
text: Text content to display in the widget
name: Unique identifier for the widget. If duplicated, the old one will be removed from scene.
args: Configuration dictionary for widget appearance and behavior
"""
widget_config = args | {"text": text, "target_prim_path": name}
show_instruction(**widget_config)
def __init__(self, data_collector: DataCollector):
"""Initialize the visualization manager.
Args:
data_collector: DataCollector instance to access the data for visualization use.
"""
self.data_collector: DataCollector = data_collector
data_collector.set_visualization_callback(self.on_change)
self._rules_on_period: dict[VisualizationManager.TimeCountdown, VisualizationManager.StandardCallback] = {}
self._rules_on_event: dict[str, list[VisualizationManager.EventCallback]] = {}
self._rules_on_change: dict[str, list[VisualizationManager.StandardCallback]] = {}
self._rules_on_update: list[VisualizationManager.StandardCallback] = []
# Todo: add support to registering same callbacks for different names
def on_change(self, names: set[str]) -> None:
"""Handle data changes by executing registered callbacks.
Args:
names: Set of data field names that have changed
"""
for name in names:
callbacks = self._rules_on_change.get(name)
if callbacks:
# Create a copy of the list to avoid modification during iteration
for callback in list(callbacks):
callback(self, self.data_collector)
if len(names) > 0:
self.on_event("default_event_has_change")
def update_loop(self) -> None:
"""Update periodic timers and execute callbacks as needed.
This method should be called regularly to ensure periodic callbacks
are executed at the correct intervals.
"""
# Create a copy of the list to avoid modification during iteration
for callback in list(self._rules_on_update):
callback(self, self.data_collector)
current_time = time.time()
# Create a copy of the items to avoid modification during iteration
for timer, callback in list(self._rules_on_period.items()):
triggered = timer.update(current_time)
if triggered:
callback(self, self.data_collector)
def on_event(self, event: str, params: Any = None) -> None:
"""Handle events by executing registered callbacks.
Args:
event: Name of the event that occurred
"""
callbacks = self._rules_on_event.get(event)
if callbacks is None:
return
# Create a copy of the list to avoid modification during iteration
for callback in list(callbacks):
callback(self, self.data_collector, params)
# Todo: better organization of callbacks
def register_callback(self, trigger: TriggerType, arg: dict, callback: CallbackType) -> Any:
"""Register a callback function to be executed based on trigger conditions.
Args:
trigger: Type of trigger that should execute the callback
arg: Dictionary containing trigger-specific parameters:
- For TRIGGER_ON_PERIOD: {"period": float}
- For TRIGGER_ON_EVENT: {"event_name": str}
- For TRIGGER_ON_CHANGE: {"variable_name": str}
- For TRIGGER_ON_UPDATE: {}
callback: Function to execute when trigger condition is met
- For TRIGGER_ON_EVENT: callback(manager: VisualizationManager, data_collector: DataCollector, event_params: Any)
- For others: callback(manager: VisualizationManager, data_collector: DataCollector)
Raises:
TypeError: If callback signature doesn't match the expected signature for the trigger type
"""
# Validate callback signature based on trigger type
self._validate_callback_signature(trigger, callback)
match trigger:
case TriggerType.TRIGGER_ON_PERIOD:
period = arg.get("period")
initial_countdown = arg.get("initial_countdown", 0.0)
if isinstance(period, float) and isinstance(initial_countdown, float):
timer = VisualizationManager.TimeCountdown(period=period, initial_countdown=initial_countdown)
# Type cast since we've validated the signature
self._rules_on_period[timer] = callback # type: ignore
return timer
case TriggerType.TRIGGER_ON_EVENT:
event = arg.get("event_name")
if isinstance(event, str):
callbacks = self._rules_on_event.get(event)
if callbacks is None:
# Type cast since we've validated the signature
self._rules_on_event[event] = [callback] # type: ignore
else:
# Type cast since we've validated the signature
self._rules_on_event[event].append(callback) # type: ignore
return event
case TriggerType.TRIGGER_ON_CHANGE:
variable_name = arg.get("variable_name")
if isinstance(variable_name, str):
callbacks = self._rules_on_change.get(variable_name)
if callbacks is None:
# Type cast since we've validated the signature
self._rules_on_change[variable_name] = [callback] # type: ignore
else:
# Type cast since we've validated the signature
self._rules_on_change[variable_name].append(callback) # type: ignore
return variable_name
case TriggerType.TRIGGER_ON_UPDATE:
# Type cast since we've validated the signature
self._rules_on_update.append(callback) # type: ignore
return None
# Todo: better callback-cancel method
def cancel_rule(self, trigger: TriggerType, arg: str | TimeCountdown, callback: Callable | None = None) -> None:
"""Remove a previously registered callback.
Periodic callbacks are not supported to be cancelled for now.
Args:
trigger: Type of trigger for the callback to remove
arg: Trigger-specific identifier (event name or variable name)
callback: The callback function to remove
"""
callbacks = None
match trigger:
case TriggerType.TRIGGER_ON_CHANGE:
callbacks = self._rules_on_change.get(arg)
case TriggerType.TRIGGER_ON_EVENT:
callbacks = self._rules_on_event.get(arg)
case TriggerType.TRIGGER_ON_PERIOD:
self._rules_on_period.pop(arg)
case TriggerType.TRIGGER_ON_UPDATE:
callbacks = self._rules_on_update
if callbacks is not None:
if callback is not None:
callbacks.remove(callback)
else:
callbacks.clear()
def set_attr(self, name: str, value: Any) -> None:
"""Set an attribute of the visualization manager.
Args:
name: Name of the attribute to set
value: Value to set the attribute to
"""
setattr(self, name, value)
def _validate_callback_signature(self, trigger: TriggerType, callback: Callable) -> None:
"""Validate that the callback has the correct signature for the trigger type.
Args:
trigger: Type of trigger for the callback
callback: The callback function to validate
Raises:
TypeError: If callback signature doesn't match expected signature
"""
try:
sig = inspect.signature(callback)
params = list(sig.parameters.values())
# Remove 'self' parameter if it's a bound method
if params and params[0].name == "self":
params = params[1:]
param_count = len(params)
if trigger == TriggerType.TRIGGER_ON_EVENT:
# Event callbacks should have 3 parameters: (manager, data_collector, event_params)
expected_count = 3
expected_sig = (
"callback(manager: VisualizationManager, data_collector: DataCollector, event_params: Any)"
)
else:
# Other callbacks should have 2 parameters: (manager, data_collector)
expected_count = 2
expected_sig = "callback(manager: VisualizationManager, data_collector: DataCollector)"
if param_count != expected_count:
raise TypeError(
f"Callback for {trigger.name} must have {expected_count} parameters, "
f"but got {param_count}. Expected signature: {expected_sig}. "
f"Actual signature: {sig}"
)
except Exception as e:
if isinstance(e, TypeError):
raise
# If we can't inspect the signature (e.g., built-in functions),
# just log a warning and proceed
omni.log.warn(f"Could not validate callback signature for {trigger.name}: {e}")
class XRVisualization:
"""Singleton class providing XR visualization functionality.
This class implements the singleton pattern to ensure only one instance
of the visualization system exists across the application. It provides
a centralized API for managing XR visualization features.
When manage a new event ordata field, please add a comment to the following list.
Event names:
"ik_solver_failed"
Data fields:
"manipulability_ellipsoid" : list[float]
"device_raw_data" : dict
"joints_distance_percentage_to_limit" : list[float]
"joints_torque" : list[float]
"joints_torque_limit" : list[float]
"joints_name" : list[str]
"wrist_pose" : list[float]
"approximated_working_space" : list[float]
"hand_torque_mapping" : list[str]
"""
_lock = threading.Lock()
_instance: XRVisualization | None = None
_registered = False
def __init__(self):
"""Prevent direct instantiation."""
raise RuntimeError("Use VisualizationInterface classmethods instead of direct instantiation")
@classmethod
def __create_instance(cls, manager: type[VisualizationManager] = VisualizationManager) -> XRVisualization:
"""Get the visualization manager instance.
Returns:
VisualizationManager: The visualization manager instance
"""
with cls._lock:
if cls._instance is None:
# Bypass __init__ by calling __new__ directly
cls._instance = super().__new__(cls)
cls._instance._initialize(manager)
return cls._instance
@classmethod
def __get_instance(cls) -> XRVisualization:
"""Thread-safe singleton access.
Returns:
XRVisualization: The singleton instance of the visualization system
"""
if cls._instance is None:
return cls.__create_instance()
elif not cls._instance._registered:
cls._instance._register()
return cls._instance
def _register(self) -> bool:
"""Register the visualization system.
Returns:
bool: True if the visualization system is registered, False otherwise
"""
if self._registered:
return True
sim = SimulationContext.instance()
if sim is not None:
sim.add_render_callback("visualization_render_callback", self.update_loop)
self._registered = True
return self._registered
def _initialize(self, manager: type[VisualizationManager]) -> None:
"""Initialize the singleton instance with data collector and visualization manager."""
self._data_collector = DataCollector()
self._visualization_manager = manager(self._data_collector)
self._register()
self._initialized = True
# APIs
def update_loop(self, event) -> None:
"""Update the visualization system.
This method should be called regularly (e.g., every frame) to ensure
visualization updates are processed and periodic callbacks are executed.
"""
self._visualization_manager.update_loop()
self._data_collector.update_loop()
@classmethod
def push_event(cls, name: str, args: Any = None) -> None:
"""Push an event to trigger registered callbacks.
Args:
name: Name of the event to trigger
args: Optional arguments for the event (currently unused)
"""
instance = cls.__get_instance()
instance._visualization_manager.on_event(name, args)
@classmethod
def push_data(cls, item: dict[str, Any]) -> None:
"""Push data to the visualization system.
Updates multiple data fields at once. Each key-value pair in the
dictionary will be processed by the data collector.
Args:
item: Dictionary containing data field names and their values
"""
instance = cls.__get_instance()
for name, value in item.items():
instance._data_collector.update_data(name, value)
@classmethod
def set_attrs(cls, attributes: dict[str, Any]) -> None:
"""Set configuration data for the visualization system. Not currently used.
Args:
attributes: Dictionary containing configuration keys and values
"""
instance = cls.__get_instance()
for name, data in attributes.items():
instance._visualization_manager.set_attr(name, data)
@classmethod
def get_attr(cls, name: str) -> Any:
"""Get configuration data for the visualization system. Not currently used.
Args:
name: Configuration key
"""
instance = cls.__get_instance()
return getattr(instance._visualization_manager, name)
@classmethod
def register_callback(cls, trigger: TriggerType, arg: dict, callback: VisualizationManager.CallbackType) -> None:
"""Register a callback function for visualization events.
Args:
trigger: Type of trigger that should execute the callback
arg: Dictionary containing trigger-specific parameters:
- For TRIGGER_ON_PERIOD: {"period": float}
- For TRIGGER_ON_EVENT: {"event_name": str}
- For TRIGGER_ON_CHANGE: {"variable_name": str}
callback: Function to execute when trigger condition is met
"""
instance = cls.__get_instance()
instance._visualization_manager.register_callback(trigger, arg, callback)
@classmethod
def assign_manager(cls, manager: type[VisualizationManager]) -> None:
"""Assign a visualization manager type to the visualization system.
Args:
manager: Type of the visualization manager to assign
"""
if cls._instance is not None:
omni.log.error(
f"Visualization system already initialized to {type(cls._instance._visualization_manager).__name__},"
f" cannot assign manager {manager.__name__}"
)
return
cls.__create_instance(manager)
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from typing import Any
from isaaclab.ui.xr_widgets import DataCollector, TriggerType, VisualizationManager
from isaaclab.ui.xr_widgets.instruction_widget import hide_instruction
class TeleopVisualizationManager(VisualizationManager):
"""Specialized visualization manager for teleoperation scenarios.
For sample and debug use.
Provides teleoperation-specific visualization features including:
- IK error handling and display
"""
def __init__(self, data_collector: DataCollector):
"""Initialize the teleop visualization manager and register callbacks.
Args:
data_collector: DataCollector instance to read data for visualization use.
"""
super().__init__(data_collector)
# Handle error event
self._error_text_color = 0xFF0000FF
self.ik_error_widget_id = "/ik_solver_failed"
self.register_callback(TriggerType.TRIGGER_ON_EVENT, {"event_name": "ik_error"}, self._handle_ik_error)
def _handle_ik_error(self, mgr: VisualizationManager, data_collector: DataCollector, params: Any = None) -> None:
"""Handle IK error events by displaying an error message widget.
Args:
data_collector: DataCollector instance (unused in this handler)
"""
# Todo: move display_widget to instruction_widget.py
if not hasattr(mgr, "_ik_error_widget_timer"):
self.display_widget(
"IK Error Detected",
mgr.ik_error_widget_id,
VisualizationManager.message_widget_preset()
| {"text_color": self._error_text_color, "display_duration": None},
)
mgr._ik_error_widget_timer = mgr.register_callback(
TriggerType.TRIGGER_ON_PERIOD, {"period": 3.0, "initial_countdown": 3.0}, self._hide_ik_error_widget
)
if mgr._ik_error_widget_timer is None:
mgr.cancel_rule(TriggerType.TRIGGER_ON_PERIOD, mgr._ik_error_widget_timer)
mgr.cancel_rule(TriggerType.TRIGGER_ON_EVENT, "ik_solver_failed")
raise RuntimeWarning("Failed to register IK error widget timer")
else:
mgr._ik_error_widget_timer.countdown = 3.0
def _hide_ik_error_widget(self, mgr: VisualizationManager, data_collector: DataCollector) -> None:
"""Hide the IK error widget.
Args:
data_collector: DataCollector instance (unused in this handler)
"""
hide_instruction(mgr.ik_error_widget_id)
mgr.cancel_rule(TriggerType.TRIGGER_ON_PERIOD, mgr._ik_error_widget_timer)
delattr(mgr, "_ik_error_widget_timer")
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
This script checks if the XR visualization widgets are visible from the camera.
.. code-block:: bash
# Usage
./isaaclab.sh -p source/isaaclab/test/visualization/check_scene_visualization.py
"""
"""Launch Isaac Sim Simulator first."""
import argparse
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Check XR visualization widgets in Isaac Lab.")
parser.add_argument("--num_envs", type=int, default=2, help="Number of environments to spawn.")
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
# launch omniverse app with XR support
args_cli.xr = True
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
"""Rest everything follows."""
import time
from typing import Any
from pxr import Gf
import isaaclab.sim as sim_utils
from isaaclab.assets import AssetBaseCfg
from isaaclab.scene import InteractiveScene, InteractiveSceneCfg
from isaaclab.ui.xr_widgets import DataCollector, TriggerType, VisualizationManager, XRVisualization, update_instruction
from isaaclab.utils import configclass
##
# Pre-defined configs
##
@configclass
class SimpleSceneCfg(InteractiveSceneCfg):
"""Design the scene with sensors on the robot."""
# ground plane
ground = AssetBaseCfg(prim_path="/World/defaultGroundPlane", spawn=sim_utils.GroundPlaneCfg())
# lights
dome_light = AssetBaseCfg(
prim_path="/World/Light", spawn=sim_utils.DomeLightCfg(intensity=3000.0, color=(0.75, 0.75, 0.75))
)
def get_camera_position():
"""Get the current camera position from the USD stage.
Returns:
tuple: (x, y, z) camera position or None if not available
"""
try:
import isaacsim.core.utils.stage as stage_utils
from pxr import UsdGeom
stage = stage_utils.get_current_stage()
if stage is not None:
# Get the viewport camera prim
camera_prim_path = "/OmniverseKit_Persp"
camera_prim = stage.GetPrimAtPath(camera_prim_path)
if camera_prim and camera_prim.IsValid():
# Get the camera's world transform
camera_xform = UsdGeom.Xformable(camera_prim)
world_transform = camera_xform.ComputeLocalToWorldTransform(0) # 0 = current time
# Extract position from the transform matrix
camera_pos = world_transform.ExtractTranslation()
return (camera_pos[0], camera_pos[1], camera_pos[2])
return None
except Exception as e:
print(f"[ERROR]: Failed to get camera position: {e}")
return None
def _sample_handle_ik_error(mgr: VisualizationManager, data_collector: DataCollector, params: Any = None) -> None:
error_text_color = getattr(mgr, "_error_text_color", 0xFF0000FF)
mgr.display_widget(
"IK Error Detected",
"/ik_error",
VisualizationManager.message_widget_preset()
| {
"text_color": error_text_color,
"prim_path_source": "/World/defaultGroundPlane/GroundPlane",
"translation": Gf.Vec3f(0, 0, 1),
},
)
def _sample_update_error_text_color(mgr: VisualizationManager, data_collector: DataCollector) -> None:
current_color = getattr(mgr, "_error_text_color", 0xFF0000FF)
new_color = current_color + 0x100
if new_color >= 0xFFFFFFFF:
new_color = 0xFF0000FF
mgr.set_attr("_error_text_color", new_color)
def _sample_update_left_panel(mgr: VisualizationManager, data_collector: DataCollector) -> None:
left_panel_id = getattr(mgr, "left_panel_id", None)
if left_panel_id is None:
return
left_panel_created = getattr(mgr, "_left_panel_created", False)
if left_panel_created is False:
# create a new left panel
mgr.display_widget(
"Left Panel",
left_panel_id,
VisualizationManager.panel_widget_preset()
| {
"text_color": 0xFFFFFFFF,
"prim_path_source": "/World/defaultGroundPlane/GroundPlane",
"translation": Gf.Vec3f(0, -3, 1),
},
)
mgr.set_attr("_left_panel_created", True)
updated_times = getattr(mgr, "_left_panel_updated_times", 0)
# Create a simple panel content since make_panel_content doesn't exist
content = f"Left Panel\nUpdated #{updated_times} times"
update_instruction(left_panel_id, content)
mgr.set_attr("_left_panel_updated_times", updated_times + 1)
def _sample_update_right_panel(mgr: VisualizationManager, data_collector: DataCollector) -> None:
right_panel_id = getattr(mgr, "right_panel_id", None)
if right_panel_id is None:
return
updated_times = getattr(mgr, "_right_panel_updated_times", 0)
# Create a simple panel content since make_panel_content doesn't exist
right_panel_data = data_collector.get_data("right_panel_data")
if right_panel_data is not None:
assert isinstance(right_panel_data, (tuple, list)), "Right panel data must be a tuple or list"
# Format each element to 3 decimal places
formatted_data = tuple(f"{x:.3f}" for x in right_panel_data)
content = f"Right Panel\nUpdated #{updated_times} times\nData: {formatted_data}"
else:
content = f"Right Panel\nUpdated #{updated_times} times\nData: None"
right_panel_created = getattr(mgr, "_right_panel_created", False)
if right_panel_created is False:
# create a new left panel
mgr.display_widget(
content,
right_panel_id,
VisualizationManager.panel_widget_preset()
| {
"text_color": 0xFFFFFFFF,
"prim_path_source": "/World/defaultGroundPlane/GroundPlane",
"translation": Gf.Vec3f(0, 3, 1),
},
)
mgr.set_attr("_right_panel_created", True)
update_instruction(right_panel_id, content)
mgr.set_attr("_right_panel_updated_times", updated_times + 1)
def apply_sample_visualization():
# Error Message
XRVisualization.register_callback(TriggerType.TRIGGER_ON_EVENT, {"event_name": "ik_error"}, _sample_handle_ik_error)
# Display a panel on the left to display DataCollector data
# Refresh periodically
XRVisualization.set_attrs({
"left_panel_id": "/left_panel",
"left_panel_translation": Gf.Vec3f(-2, 2.6, 2),
"left_panel_updated_times": 0,
"right_panel_updated_times": 0,
})
XRVisualization.register_callback(TriggerType.TRIGGER_ON_PERIOD, {"period": 1.0}, _sample_update_left_panel)
# Display a panel on the right to display DataCollector data
# Refresh when camera position changes
XRVisualization.set_attrs({
"right_panel_id": "/right_panel",
"right_panel_translation": Gf.Vec3f(1.5, 2, 2),
})
XRVisualization.register_callback(
TriggerType.TRIGGER_ON_CHANGE, {"variable_name": "right_panel_data"}, _sample_update_right_panel
)
# Change error text color every second
XRVisualization.set_attrs({
"error_text_color": 0xFF0000FF,
})
XRVisualization.register_callback(TriggerType.TRIGGER_ON_UPDATE, {}, _sample_update_error_text_color)
def run_simulator(
sim: sim_utils.SimulationContext,
scene: InteractiveScene,
):
"""Run the simulator."""
# Define simulation stepping
sim_dt = sim.get_physics_dt()
apply_sample_visualization()
# Simulate
while simulation_app.is_running():
if int(time.time()) % 10 < 1:
XRVisualization.push_event("ik_error")
XRVisualization.push_data({"right_panel_data": get_camera_position()})
sim.step()
scene.update(sim_dt)
def main():
"""Main function."""
# Initialize the simulation context
sim_cfg = sim_utils.SimulationCfg(dt=0.005)
sim = sim_utils.SimulationContext(sim_cfg)
# Set main camera
sim.set_camera_view(eye=(8, 0, 4), target=(0.0, 0.0, 0.0))
# design scene
scene = InteractiveScene(SimpleSceneCfg(num_envs=args_cli.num_envs, env_spacing=2.0))
# Play the simulator
sim.reset()
# Now we are ready!
print("[INFO]: Setup complete...")
# Run the simulator
run_simulator(sim, scene)
if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()
......@@ -3,6 +3,7 @@
#
# SPDX-License-Identifier: BSD-3-Clause
import carb
from pink.tasks import DampingTask, FrameTask
import isaaclab.controllers.utils as ControllerUtils
......@@ -171,6 +172,7 @@ class ExhaustPipeGR1T2PinkIKEnvCfg(ExhaustPipeGR1T2BaseEnvCfg):
# orientation_cost=0.05, # [cost] / [rad]
# ),
],
xr_enabled=bool(carb.settings.get_settings().get("/app/xr/enabled")),
),
)
# Convert USD to URDF and change revolute joints to fixed
......
......@@ -3,6 +3,7 @@
#
# SPDX-License-Identifier: BSD-3-Clause
import carb
from pink.tasks import DampingTask, FrameTask
import isaaclab.controllers.utils as ControllerUtils
......@@ -169,6 +170,7 @@ class NutPourGR1T2PinkIKEnvCfg(NutPourGR1T2BaseEnvCfg):
# orientation_cost=0.05, # [cost] / [rad]
# ),
],
xr_enabled=bool(carb.settings.get_settings().get("/app/xr/enabled")),
),
)
# Convert USD to URDF and change revolute joints to fixed
......
......@@ -6,6 +6,7 @@
import tempfile
import torch
import carb
from pink.tasks import DampingTask, FrameTask
import isaaclab.controllers.utils as ControllerUtils
......@@ -255,6 +256,7 @@ class ActionsCfg:
),
],
fixed_input_tasks=[],
xr_enabled=bool(carb.settings.get_settings().get("/app/xr/enabled")),
),
)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment