Commit c45b8c0b authored by rwiltz's avatar rwiltz Committed by Kelly Guo

Implements teleop config parameters and device factory (#293)

This change moves the configuration of the teleop device from the end
user scripts (record_demos.py, teleop_se3.py, etc) to the env config
itself. This allows the task to specify what teleop devices are
supported along with the per env configuration of supported teleop
device.

Example teleop config inside of the env config:
```
        self.teleop_devices = DevicesCfg(
            devices={
                "handtracking": OpenXRDeviceCfg(
                    retargeters=[
                        Se3RelRetargeterCfg(
                            bound_hand=OpenXRDevice.TrackingTarget.HAND_RIGHT,
                            zero_out_xy_rotation=True,
                            use_wrist_rotation=False,
                            use_wrist_position=True,
                            delta_pos_scale_factor=10.0,
                            delta_rot_scale_factor=10.0,
                            sim_device=self.sim.device,
                        ),
                        GripperRetargeterCfg(
                            bound_hand=OpenXRDevice.TrackingTarget.HAND_RIGHT, sim_device=self.sim.device
                        ),
                    ],
                    sim_device=self.sim.device,
                    xr_cfg=self.xr,
                ),
                "keyboard": Se3KeyboardCfg(
                    pos_sensitivity=0.05,
                    rot_sensitivity=0.05,
                    sim_device=self.sim.device,
                ),
            }
        )
```
<!--
Thank you for your interest in sending a pull request. Please make sure
to check the contribution guidelines.

Link:
https://isaac-sim.github.io/IsaacLab/main/source/refs/contributing.html
-->

Please include a summary of the change and which issue is fixed. Please
also include relevant motivation and context.
List any dependencies that are required for this change.

Fixes # (issue)

<!-- As a practice, it is recommended to open an issue to have
discussions on the proposed pull request.
This makes it easier for the community to keep track of what is being
developed or added, and if a given feature
is demanded by more than one party. -->

<!-- As you go through the list, delete the ones that are not
applicable. -->

- New feature (non-breaking change which adds functionality)
- This change requires a documentation update

Please attach before and after screenshots of the change if applicable.

<!--
Example:

| Before | After |
| ------ | ----- |
| _gif/png before_ | _gif/png after_ |

To upload images to a PR -- simply drag and drop an image while in edit
mode and it should upload the image directly. You can then paste that
source into the above before/after sections.
-->

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./isaaclab.sh --format`
- [x] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [x] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there

<!--
As you go through the checklist above, you can mark something as done by
putting an x character in it

For example,
- [x] I have done this task
- [ ] I have not done this task
-->

---------
Signed-off-by: 's avatarKelly Guo <kellyg@nvidia.com>
Co-authored-by: 's avatarKelly Guo <kellyguo123@hotmail.com>
Co-authored-by: 's avatarKelly Guo <kellyg@nvidia.com>
parent 0d0dd13f
......@@ -8,13 +8,20 @@
"""Launch Isaac Sim Simulator first."""
import argparse
from collections.abc import Callable
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Keyboard teleoperation for Isaac Lab environments.")
parser.add_argument("--num_envs", type=int, default=1, help="Number of environments to simulate.")
parser.add_argument("--teleop_device", type=str, default="keyboard", help="Device for interacting with environment")
parser.add_argument(
"--teleop_device",
type=str,
default="keyboard",
choices=["keyboard", "spacemouse", "gamepad", "handtracking"],
help="Device for interacting with environment",
)
parser.add_argument("--task", type=str, default=None, help="Name of the task.")
parser.add_argument("--sensitivity", type=float, default=1.0, help="Sensitivity factor.")
parser.add_argument(
......@@ -46,72 +53,32 @@ simulation_app = app_launcher.app
import gymnasium as gym
import numpy as np
import torch
import omni.log
from isaaclab.devices import OpenXRDevice, Se3Gamepad, Se3Keyboard, Se3SpaceMouse
if args_cli.enable_pinocchio:
from isaaclab.devices.openxr.retargeters.humanoid.fourier.gr1t2_retargeter import GR1T2Retargeter
import isaaclab_tasks.manager_based.manipulation.pick_place # noqa: F401
from isaaclab.devices.openxr.retargeters.manipulator import GripperRetargeter, Se3AbsRetargeter, Se3RelRetargeter
from isaaclab.devices import Se3Gamepad, Se3GamepadCfg, Se3Keyboard, Se3KeyboardCfg, Se3SpaceMouse, Se3SpaceMouseCfg
from isaaclab.devices.teleop_device_factory import create_teleop_device
from isaaclab.managers import TerminationTermCfg as DoneTerm
import isaaclab_tasks # noqa: F401
from isaaclab_tasks.manager_based.manipulation.lift import mdp
from isaaclab_tasks.utils import parse_env_cfg
if args_cli.enable_pinocchio:
import isaaclab_tasks.manager_based.manipulation.pick_place # noqa: F401
def pre_process_actions(
teleop_data: tuple[np.ndarray, bool] | list[tuple[np.ndarray, np.ndarray, np.ndarray]], num_envs: int, device: str
) -> torch.Tensor:
"""Convert teleop data to the format expected by the environment action space.
Args:
teleop_data: Data from the teleoperation device.
num_envs: Number of environments.
device: Device to create tensors on.
def main() -> None:
"""
Run keyboard teleoperation with Isaac Lab manipulation environment.
Creates the environment, sets up teleoperation interfaces and callbacks,
and runs the main simulation loop until the application is closed.
Returns:
Processed actions as a tensor.
None
"""
# compute actions based on environment
if "Reach" in args_cli.task:
delta_pose, gripper_command = teleop_data
# convert to torch
delta_pose = torch.tensor(delta_pose, dtype=torch.float, device=device).repeat(num_envs, 1)
# note: reach is the only one that uses a different action space
# compute actions
return delta_pose
elif "PickPlace-GR1T2" in args_cli.task:
(left_wrist_pose, right_wrist_pose, hand_joints) = teleop_data[0]
# Reconstruct actions_arms tensor with converted positions and rotations
actions = torch.tensor(
np.concatenate([
left_wrist_pose, # left ee pose
right_wrist_pose, # right ee pose
hand_joints, # hand joint angles
]),
device=device,
dtype=torch.float32,
).unsqueeze(0)
# Concatenate arm poses and hand joint angles
return actions
else:
# resolve gripper command
delta_pose, gripper_command = teleop_data
# convert to torch
delta_pose = torch.tensor(delta_pose, dtype=torch.float, device=device).repeat(num_envs, 1)
gripper_vel = torch.zeros((delta_pose.shape[0], 1), dtype=torch.float, device=device)
gripper_vel[:] = -1 if gripper_command else 1
# compute actions
return torch.concat([delta_pose, gripper_vel], dim=1)
def main():
"""Running keyboard teleoperation with Isaac Lab manipulation environment."""
# parse configuration
env_cfg = parse_env_cfg(args_cli.task, device=args_cli.device, num_envs=args_cli.num_envs)
env_cfg.env_name = args_cli.task
......@@ -122,155 +89,165 @@ def main():
env_cfg.commands.object_pose.resampling_time_range = (1.0e9, 1.0e9)
# add termination condition for reaching the goal otherwise the environment won't reset
env_cfg.terminations.object_reached_goal = DoneTerm(func=mdp.object_reached_goal)
# create environment
env = gym.make(args_cli.task, cfg=env_cfg).unwrapped
# check environment name (for reach , we don't allow the gripper)
if "Reach" in args_cli.task:
omni.log.warn(
f"The environment '{args_cli.task}' does not support gripper control. The device command will be ignored."
)
try:
# create environment
env = gym.make(args_cli.task, cfg=env_cfg).unwrapped
# check environment name (for reach , we don't allow the gripper)
if "Reach" in args_cli.task:
omni.log.warn(
f"The environment '{args_cli.task}' does not support gripper control. The device command will be"
" ignored."
)
except Exception as e:
omni.log.error(f"Failed to create environment: {e}")
simulation_app.close()
return
# Flags for controlling teleoperation flow
should_reset_recording_instance = False
teleoperation_active = True
# Callback handlers
def reset_recording_instance():
"""Reset the environment to its initial state.
def reset_recording_instance() -> None:
"""
Reset the environment to its initial state.
This callback is triggered when the user presses the reset key (typically 'R').
It's useful when:
- The robot gets into an undesirable configuration
- The user wants to start over with the task
- Objects in the scene need to be reset to their initial positions
Sets a flag to reset the environment on the next simulation step.
The environment will be reset on the next simulation step.
Returns:
None
"""
nonlocal should_reset_recording_instance
should_reset_recording_instance = True
omni.log.info("Reset triggered - Environment will reset on next step")
def start_teleoperation():
"""Activate teleoperation control of the robot.
def start_teleoperation() -> None:
"""
Activate teleoperation control of the robot.
This callback enables active control of the robot through the input device.
It's typically triggered by a specific gesture or button press and is used when:
- Beginning a new teleoperation session
- Resuming control after temporarily pausing
- Switching from observation mode to control mode
Enables the application of teleoperation commands to the environment.
While active, all commands from the device will be applied to the robot.
Returns:
None
"""
nonlocal teleoperation_active
teleoperation_active = True
omni.log.info("Teleoperation activated")
def stop_teleoperation():
"""Deactivate teleoperation control of the robot.
def stop_teleoperation() -> None:
"""
Deactivate teleoperation control of the robot.
This callback temporarily suspends control of the robot through the input device.
It's typically triggered by a specific gesture or button press and is used when:
- Taking a break from controlling the robot
- Repositioning the input device without moving the robot
- Pausing to observe the scene without interference
Disables the application of teleoperation commands to the environment.
While inactive, the simulation continues to render but device commands are ignored.
Returns:
None
"""
nonlocal teleoperation_active
teleoperation_active = False
# create controller
if args_cli.teleop_device.lower() == "keyboard":
teleop_interface = Se3Keyboard(
pos_sensitivity=0.05 * args_cli.sensitivity, rot_sensitivity=0.05 * args_cli.sensitivity
)
elif args_cli.teleop_device.lower() == "spacemouse":
teleop_interface = Se3SpaceMouse(
pos_sensitivity=0.05 * args_cli.sensitivity, rot_sensitivity=0.05 * args_cli.sensitivity
)
elif args_cli.teleop_device.lower() == "gamepad":
teleop_interface = Se3Gamepad(
pos_sensitivity=0.1 * args_cli.sensitivity, rot_sensitivity=0.1 * args_cli.sensitivity
)
elif "dualhandtracking_abs" in args_cli.teleop_device.lower() and "GR1T2" in args_cli.task:
# Create GR1T2 retargeter with desired configuration
gr1t2_retargeter = GR1T2Retargeter(
enable_visualization=True,
num_open_xr_hand_joints=2 * 26, # OpenXR hand tracking spec has 26 joints
device=env.unwrapped.device,
hand_joint_names=env.scene["robot"].data.joint_names[-22:],
)
# Create hand tracking device with retargeter
teleop_interface = OpenXRDevice(
env_cfg.xr,
retargeters=[gr1t2_retargeter],
)
teleop_interface.add_callback("RESET", reset_recording_instance)
teleop_interface.add_callback("START", start_teleoperation)
teleop_interface.add_callback("STOP", stop_teleoperation)
# Hand tracking needs explicit start gesture to activate
omni.log.info("Teleoperation deactivated")
# Create device config if not already in env_cfg
teleoperation_callbacks: dict[str, Callable[[], None]] = {
"R": reset_recording_instance,
"START": start_teleoperation,
"STOP": stop_teleoperation,
"RESET": reset_recording_instance,
}
# For hand tracking devices, add additional callbacks
if args_cli.xr:
# Default to inactive for hand tracking
teleoperation_active = False
else:
# Always active for other devices
teleoperation_active = True
elif "handtracking" in args_cli.teleop_device.lower():
# Create EE retargeter with desired configuration
if "_abs" in args_cli.teleop_device.lower():
retargeter_device = Se3AbsRetargeter(
bound_hand=OpenXRDevice.TrackingTarget.HAND_RIGHT, zero_out_xy_rotation=True
# Create teleop device from config if present, otherwise create manually
teleop_interface = None
try:
if hasattr(env_cfg, "teleop_devices") and args_cli.teleop_device in env_cfg.teleop_devices.devices:
teleop_interface = create_teleop_device(
args_cli.teleop_device, env_cfg.teleop_devices.devices, teleoperation_callbacks
)
else:
retargeter_device = Se3RelRetargeter(
bound_hand=OpenXRDevice.TrackingTarget.HAND_RIGHT, zero_out_xy_rotation=True
)
grip_retargeter = GripperRetargeter(bound_hand=OpenXRDevice.TrackingTarget.HAND_RIGHT)
# Create hand tracking device with retargeter (in a list)
teleop_interface = OpenXRDevice(
env_cfg.xr,
retargeters=[retargeter_device, grip_retargeter],
)
teleop_interface.add_callback("RESET", reset_recording_instance)
teleop_interface.add_callback("START", start_teleoperation)
teleop_interface.add_callback("STOP", stop_teleoperation)
# Hand tracking needs explicit start gesture to activate
teleoperation_active = False
else:
raise ValueError(
f"Invalid device interface '{args_cli.teleop_device}'. Supported: 'keyboard', 'spacemouse', 'gamepad',"
" 'handtracking', 'handtracking_abs'."
)
# add teleoperation key for env reset (for all devices)
teleop_interface.add_callback("R", reset_recording_instance)
print(teleop_interface)
omni.log.warn(f"No teleop device '{args_cli.teleop_device}' found in environment config. Creating default.")
# Create fallback teleop device
sensitivity = args_cli.sensitivity
if args_cli.teleop_device.lower() == "keyboard":
teleop_interface = Se3Keyboard(
Se3KeyboardCfg(pos_sensitivity=0.05 * sensitivity, rot_sensitivity=0.05 * sensitivity)
)
elif args_cli.teleop_device.lower() == "spacemouse":
teleop_interface = Se3SpaceMouse(
Se3SpaceMouseCfg(pos_sensitivity=0.05 * sensitivity, rot_sensitivity=0.05 * sensitivity)
)
elif args_cli.teleop_device.lower() == "gamepad":
teleop_interface = Se3Gamepad(
Se3GamepadCfg(pos_sensitivity=0.1 * sensitivity, rot_sensitivity=0.1 * sensitivity)
)
else:
omni.log.error(f"Unsupported teleop device: {args_cli.teleop_device}")
omni.log.error("Supported devices: keyboard, spacemouse, gamepad, handtracking")
env.close()
simulation_app.close()
return
# Add callbacks to fallback device
for key, callback in teleoperation_callbacks.items():
try:
teleop_interface.add_callback(key, callback)
except (ValueError, TypeError) as e:
omni.log.warn(f"Failed to add callback for key {key}: {e}")
except Exception as e:
omni.log.error(f"Failed to create teleop device: {e}")
env.close()
simulation_app.close()
return
if teleop_interface is None:
omni.log.error("Failed to create teleop interface")
env.close()
simulation_app.close()
return
omni.log.info(f"Using teleop device: {teleop_interface}")
# reset environment
env.reset()
teleop_interface.reset()
omni.log.info("Teleoperation started. Press 'R' to reset the environment.")
# simulate environment
while simulation_app.is_running():
# run everything in inference mode
with torch.inference_mode():
# get device command
teleop_data = teleop_interface.advance()
# Only apply teleop commands when active
if teleoperation_active:
# compute actions based on environment
actions = pre_process_actions(teleop_data, env.num_envs, env.device)
# apply actions
env.step(actions)
else:
env.sim.render()
if should_reset_recording_instance:
env.reset()
should_reset_recording_instance = False
try:
# run everything in inference mode
with torch.inference_mode():
# get device command
action = teleop_interface.advance()
# Only apply teleop commands when active
if teleoperation_active:
# process actions
actions = action.repeat(env.num_envs, 1)
# apply actions
env.step(actions)
else:
env.sim.render()
if should_reset_recording_instance:
env.reset()
should_reset_recording_instance = False
omni.log.info("Environment reset complete")
except Exception as e:
omni.log.error(f"Error during simulation step: {e}")
break
# close the simulator
env.close()
omni.log.info("Environment closed")
if __name__ == "__main__":
......
......@@ -62,7 +62,7 @@ if args_cli.enable_pinocchio:
# Only enables inputs if this script is NOT headless mode
if not args_cli.headless and not os.environ.get("HEADLESS", 0):
from isaaclab.devices import Se3Keyboard
from isaaclab.devices import Se3Keyboard, Se3KeyboardCfg
from isaaclab.envs import ManagerBasedRLMimicEnv
from isaaclab.envs.mdp.recorders.recorders_cfg import ActionStateRecorderManagerCfg
......@@ -225,7 +225,7 @@ def main():
# Only enables inputs if this script is NOT headless mode
if not args_cli.headless and not os.environ.get("HEADLESS", 0):
keyboard_interface = Se3Keyboard(pos_sensitivity=0.1, rot_sensitivity=0.1)
keyboard_interface = Se3Keyboard(Se3KeyboardCfg(pos_sensitivity=0.1, rot_sensitivity=0.1))
keyboard_interface.add_callback("N", play_cb)
keyboard_interface.add_callback("B", pause_cb)
keyboard_interface.add_callback("Q", skip_episode_cb)
......
......@@ -80,7 +80,7 @@ import random
import time
import torch
from isaaclab.devices import Se3Keyboard, Se3SpaceMouse
from isaaclab.devices import Se3Keyboard, Se3KeyboardCfg, Se3SpaceMouse, Se3SpaceMouseCfg
from isaaclab.envs import ManagerBasedRLMimicEnv
from isaaclab.envs.mdp.recorders.recorders_cfg import ActionStateRecorderManagerCfg
from isaaclab.managers import DatasetExportMode, RecorderTerm, RecorderTermCfg
......@@ -198,9 +198,9 @@ async def run_teleop_robot(
# create controller if needed
if teleop_interface is None:
if args_cli.teleop_device.lower() == "keyboard":
teleop_interface = Se3Keyboard(pos_sensitivity=0.2, rot_sensitivity=0.5)
teleop_interface = Se3Keyboard(Se3KeyboardCfg(pos_sensitivity=0.2, rot_sensitivity=0.5))
elif args_cli.teleop_device.lower() == "spacemouse":
teleop_interface = Se3SpaceMouse(pos_sensitivity=0.2, rot_sensitivity=0.5)
teleop_interface = Se3SpaceMouse(Se3SpaceMouseCfg(pos_sensitivity=0.2, rot_sensitivity=0.5))
else:
raise ValueError(
f"Invalid device interface '{args_cli.teleop_device}'. Supported: 'keyboard', 'spacemouse'."
......
......@@ -32,7 +32,7 @@ from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Record demonstrations for Isaac Lab environments.")
parser.add_argument("--task", type=str, default=None, help="Name of the task.")
parser.add_argument("--task", type=str, required=True, help="Name of the task.")
parser.add_argument("--teleop_device", type=str, default="keyboard", help="Device for interacting with environment.")
parser.add_argument(
"--dataset_file", type=str, default="./datasets/dataset.hdf5", help="File path to export recorded demos."
......@@ -59,6 +59,10 @@ AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
# Validate required arguments
if args_cli.task is None:
parser.error("--task is required")
app_launcher_args = vars(args_cli)
if args_cli.enable_pinocchio:
......@@ -77,7 +81,6 @@ simulation_app = app_launcher.app
# Third-party imports
import gymnasium as gym
import numpy as np
import os
import time
import torch
......@@ -86,18 +89,18 @@ import torch
import omni.log
import omni.ui as ui
# Additional Isaac Lab imports that can only be imported after the simulator is running
from isaaclab.devices import OpenXRDevice, Se3Keyboard, Se3SpaceMouse
from isaaclab.devices import Se3Keyboard, Se3KeyboardCfg, Se3SpaceMouse, Se3SpaceMouseCfg
from isaaclab.devices.teleop_device_factory import create_teleop_device
import isaaclab_mimic.envs # noqa: F401
from isaaclab_mimic.ui.instruction_display import InstructionDisplay, show_subtask_instructions
if args_cli.enable_pinocchio:
from isaaclab.devices.openxr.retargeters.humanoid.fourier.gr1t2_retargeter import GR1T2Retargeter
import isaaclab_tasks.manager_based.manipulation.pick_place # noqa: F401
from isaaclab.devices.openxr.retargeters.manipulator import GripperRetargeter, Se3AbsRetargeter, Se3RelRetargeter
from isaaclab.envs import ManagerBasedEnvCfg
from collections.abc import Callable
from isaaclab.envs import DirectRLEnvCfg, ManagerBasedEnvCfg, ManagerBasedRLEnvCfg
from isaaclab.envs.mdp.recorders.recorders_cfg import ActionStateRecorderManagerCfg
from isaaclab.envs.ui import EmptyWindow
from isaaclab.managers import DatasetExportMode, SceneEntityCfg
......@@ -140,53 +143,31 @@ class RateLimiter:
self.last_time += self.sleep_duration
def pre_process_actions(
teleop_data: tuple[np.ndarray, bool] | list[tuple[np.ndarray, np.ndarray, np.ndarray]], num_envs: int, device: str
) -> torch.Tensor:
"""Convert teleop data to the format expected by the environment action space.
def setup_output_directories() -> tuple[str, str]:
"""Set up output directories for saving demonstrations.
Args:
teleop_data: Data from the teleoperation device.
num_envs: Number of environments.
device: Device to create tensors on.
Creates the output directory if it doesn't exist and extracts the file name
from the dataset file path.
Returns:
Processed actions as a tensor.
tuple[str, str]: A tuple containing:
- output_dir: The directory path where the dataset will be saved
- output_file_name: The filename (without extension) for the dataset
"""
# compute actions based on environment
if "Reach" in args_cli.task:
delta_pose, gripper_command = teleop_data
# convert to torch
delta_pose = torch.tensor(delta_pose, dtype=torch.float, device=device).repeat(num_envs, 1)
# note: reach is the only one that uses a different action space
# compute actions
return delta_pose
elif "GR1T2" in args_cli.task:
(left_wrist_pose, right_wrist_pose, hand_joints) = teleop_data[0]
# Reconstruct actions_arms tensor with converted positions and rotations
actions = torch.tensor(
np.concatenate([
left_wrist_pose, # left ee pose
right_wrist_pose, # right ee pose
hand_joints, # hand joint angles
]),
device=device,
dtype=torch.float32,
).unsqueeze(0)
# Concatenate arm poses and hand joint angles
return actions
else:
# resolve gripper command
delta_pose, gripper_command = teleop_data
# convert to torch
delta_pose = torch.tensor(delta_pose, dtype=torch.float, device=device).repeat(num_envs, 1)
gripper_vel = torch.zeros((delta_pose.shape[0], 1), dtype=torch.float, device=device)
gripper_vel[:] = -1 if gripper_command else 1
# compute actions
return torch.concat([delta_pose, gripper_vel], dim=1)
# get directory path and file name (without extension) from cli arguments
output_dir = os.path.dirname(args_cli.dataset_file)
output_file_name = os.path.splitext(os.path.basename(args_cli.dataset_file))[0]
# create directory if it does not exist
if not os.path.exists(output_dir):
os.makedirs(output_dir)
omni.log.info(f"Created output directory: {output_dir}")
return output_dir, output_file_name
def remove_camera_configs(env_cfg: ManagerBasedEnvCfg):
"""Removes cameras from environments when recording since XR does not work together with rendering."""
for attr_name in dir(env_cfg.scene):
attr = getattr(env_cfg.scene, attr_name)
if isinstance(attr, CameraCfg):
......@@ -205,26 +186,33 @@ def remove_camera_configs(env_cfg: ManagerBasedEnvCfg):
return env_cfg
def main():
"""Collect demonstrations from the environment using teleop interfaces."""
def create_environment_config(
output_dir: str, output_file_name: str
) -> tuple[ManagerBasedRLEnvCfg | DirectRLEnvCfg, object | None]:
"""Create and configure the environment configuration.
# if handtracking is selected, rate limiting is achieved via OpenXR
if "handtracking" in args_cli.teleop_device.lower():
rate_limiter = None
else:
rate_limiter = RateLimiter(args_cli.step_hz)
Parses the environment configuration and makes necessary adjustments for demo recording.
Extracts the success termination function and configures the recorder manager.
# get directory path and file name (without extension) from cli arguments
output_dir = os.path.dirname(args_cli.dataset_file)
output_file_name = os.path.splitext(os.path.basename(args_cli.dataset_file))[0]
Args:
output_dir: Directory where recorded demonstrations will be saved
output_file_name: Name of the file to store the demonstrations
# create directory if it does not exist
if not os.path.exists(output_dir):
os.makedirs(output_dir)
Returns:
tuple[isaaclab_tasks.utils.parse_cfg.EnvCfg, Optional[object]]: A tuple containing:
- env_cfg: The configured environment configuration
- success_term: The success termination object or None if not available
Raises:
Exception: If parsing the environment configuration fails
"""
# parse configuration
env_cfg = parse_env_cfg(args_cli.task, device=args_cli.device, num_envs=1)
env_cfg.env_name = args_cli.task.split(":")[-1]
try:
env_cfg = parse_env_cfg(args_cli.task, device=args_cli.device, num_envs=1)
env_cfg.env_name = args_cli.task.split(":")[-1]
except Exception as e:
omni.log.error(f"Failed to parse environment configuration: {e}")
exit(1)
# extract success checking function to invoke in the main loop
success_term = None
......@@ -237,7 +225,7 @@ def main():
" Will not be able to mark recorded demos as successful."
)
if "handtracking" in args_cli.teleop_device.lower():
if args_cli.xr:
# External cameras are not supported with XR teleop
# Check for any camera configs and disable them
env_cfg = remove_camera_configs(env_cfg)
......@@ -246,7 +234,6 @@ def main():
# modify configuration such that the environment runs indefinitely until
# the goal is reached or other termination conditions are met
env_cfg.terminations.time_out = None
env_cfg.observations.policy.concatenate_terms = False
env_cfg.recorders: ActionStateRecorderManagerCfg = ActionStateRecorderManagerCfg()
......@@ -254,157 +241,234 @@ def main():
env_cfg.recorders.dataset_filename = output_file_name
env_cfg.recorders.dataset_export_mode = DatasetExportMode.EXPORT_SUCCEEDED_ONLY
# create environment
env = gym.make(args_cli.task, cfg=env_cfg).unwrapped
return env_cfg, success_term
# Flags for controlling the demonstration recording process
should_reset_recording_instance = False
running_recording_instance = True
def reset_recording_instance():
"""Reset the current recording instance.
This function is triggered when the user indicates the current demo attempt
has failed and should be discarded. When called, it marks the environment
for reset, which will start a fresh recording instance. This is useful when:
- The robot gets into an unrecoverable state
- The user makes a mistake during demonstration
- The objects in the scene need to be reset to their initial positions
"""
nonlocal should_reset_recording_instance
should_reset_recording_instance = True
def create_environment(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg) -> gym.Env:
"""Create the environment from the configuration.
def start_recording_instance():
"""Start or resume recording the current demonstration.
Args:
env_cfg: The environment configuration object that defines the environment properties.
This should be an instance of EnvCfg created by parse_env_cfg().
This function enables active recording of robot actions. It's used when:
- Beginning a new demonstration after positioning the robot
- Resuming recording after temporarily stopping to reposition
- Continuing demonstration after pausing to adjust approach or strategy
Returns:
gym.Env: A Gymnasium environment instance for the specified task.
The user can toggle between stop/start to reposition the robot without
recording those transitional movements in the final demonstration.
"""
nonlocal running_recording_instance
running_recording_instance = True
Raises:
Exception: If environment creation fails for any reason.
"""
try:
env = gym.make(args_cli.task, cfg=env_cfg).unwrapped
return env
except Exception as e:
omni.log.error(f"Failed to create environment: {e}")
exit(1)
def stop_recording_instance():
"""Temporarily stop recording the current demonstration.
This function pauses the active recording of robot actions, allowing the user to:
- Reposition the robot or hand tracking device without recording those movements
- Take a break without terminating the entire demonstration
- Adjust their approach before continuing with the task
def setup_teleop_device(callbacks: dict[str, Callable]) -> object:
"""Set up the teleoperation device based on configuration.
The environment will continue rendering but won't record actions or advance
the simulation until recording is resumed with start_recording_instance().
"""
nonlocal running_recording_instance
running_recording_instance = False
Attempts to create a teleoperation device based on the environment configuration.
Falls back to default devices if the specified device is not found in the configuration.
def create_teleop_device(device_name: str, env: gym.Env):
"""Create and configure teleoperation device for robot control.
Args:
callbacks: Dictionary mapping callback keys to functions that will be
attached to the teleop device
Args:
device_name: Control device to use. Options include:
- "keyboard": Use keyboard keys for simple discrete movements
- "spacemouse": Use 3D mouse for precise 6-DOF control
- "handtracking": Use VR hand tracking for intuitive manipulation
- "handtracking_abs": Use VR hand tracking for intuitive manipulation with absolute EE pose
Returns:
DeviceBase: Configured teleoperation device ready for robot control
"""
device_name = device_name.lower()
nonlocal running_recording_instance
if device_name == "keyboard":
return Se3Keyboard(pos_sensitivity=0.2, rot_sensitivity=0.5)
elif device_name == "spacemouse":
return Se3SpaceMouse(pos_sensitivity=0.2, rot_sensitivity=0.5)
elif "dualhandtracking_abs" in device_name and "GR1T2" in env.cfg.env_name:
# Create GR1T2 retargeter with desired configuration
gr1t2_retargeter = GR1T2Retargeter(
enable_visualization=True,
num_open_xr_hand_joints=2 * 26, # OpenXR hand tracking spec has 26 joints
device=env.unwrapped.device,
hand_joint_names=env.scene["robot"].data.joint_names[-22:],
)
Returns:
object: The configured teleoperation device interface
# Create hand tracking device with retargeter
device = OpenXRDevice(
env_cfg.xr,
retargeters=[gr1t2_retargeter],
)
device.add_callback("RESET", reset_recording_instance)
device.add_callback("START", start_recording_instance)
device.add_callback("STOP", stop_recording_instance)
running_recording_instance = False
return device
elif "handtracking" in device_name:
# Create Franka retargeter with desired configuration
if "_abs" in device_name:
retargeter_device = Se3AbsRetargeter(
bound_hand=OpenXRDevice.TrackingTarget.HAND_RIGHT, zero_out_xy_rotation=True
)
Raises:
Exception: If teleop device creation fails
"""
teleop_interface = None
try:
if hasattr(env_cfg, "teleop_devices") and args_cli.teleop_device in env_cfg.teleop_devices.devices:
teleop_interface = create_teleop_device(args_cli.teleop_device, env_cfg.teleop_devices.devices, callbacks)
else:
omni.log.warn(f"No teleop device '{args_cli.teleop_device}' found in environment config. Creating default.")
# Create fallback teleop device
if args_cli.teleop_device.lower() == "keyboard":
teleop_interface = Se3Keyboard(Se3KeyboardCfg(pos_sensitivity=0.2, rot_sensitivity=0.5))
elif args_cli.teleop_device.lower() == "spacemouse":
teleop_interface = Se3SpaceMouse(Se3SpaceMouseCfg(pos_sensitivity=0.2, rot_sensitivity=0.5))
else:
retargeter_device = Se3RelRetargeter(
bound_hand=OpenXRDevice.TrackingTarget.HAND_RIGHT, zero_out_xy_rotation=True
)
omni.log.error(f"Unsupported teleop device: {args_cli.teleop_device}")
omni.log.error("Supported devices: keyboard, spacemouse, handtracking")
exit(1)
grip_retargeter = GripperRetargeter(bound_hand=OpenXRDevice.TrackingTarget.HAND_RIGHT)
# Add callbacks to fallback device
for key, callback in callbacks.items():
teleop_interface.add_callback(key, callback)
except Exception as e:
omni.log.error(f"Failed to create teleop device: {e}")
exit(1)
# Create hand tracking device with retargeter (in a list)
device = OpenXRDevice(
env_cfg.xr,
retargeters=[retargeter_device, grip_retargeter],
)
device.add_callback("RESET", reset_recording_instance)
device.add_callback("START", start_recording_instance)
device.add_callback("STOP", stop_recording_instance)
if teleop_interface is None:
omni.log.error("Failed to create teleop interface")
exit(1)
running_recording_instance = False
return device
else:
raise ValueError(
f"Invalid device interface '{device_name}'. Supported: 'keyboard', 'spacemouse', 'handtracking',"
" 'handtracking_abs', 'dualhandtracking_abs'."
)
return teleop_interface
teleop_interface = create_teleop_device(args_cli.teleop_device, env)
teleop_interface.add_callback("R", reset_recording_instance)
print(teleop_interface)
# reset before starting
env.sim.reset()
env.reset()
teleop_interface.reset()
def setup_ui(label_text: str, env: gym.Env) -> InstructionDisplay:
"""Set up the user interface elements.
# simulate environment -- run everything in inference mode
current_recorded_demo_count = 0
success_step_count = 0
Creates instruction display and UI window with labels for showing information
to the user during demonstration recording.
label_text = f"Recorded {current_recorded_demo_count} successful demonstrations."
Args:
label_text: Text to display showing current recording status
env: The environment instance for which UI is being created
Returns:
InstructionDisplay: The configured instruction display object
"""
instruction_display = InstructionDisplay(args_cli.teleop_device)
if "handtracking" not in args_cli.teleop_device.lower():
if not args_cli.xr:
window = EmptyWindow(env, "Instruction")
with window.ui_window_elements["main_vstack"]:
demo_label = ui.Label(label_text)
subtask_label = ui.Label("")
instruction_display.set_labels(subtask_label, demo_label)
return instruction_display
def process_success_condition(env: gym.Env, success_term: object | None, success_step_count: int) -> tuple[int, bool]:
"""Process the success condition for the current step.
Checks if the environment has met the success condition for the required
number of consecutive steps. Marks the episode as successful if criteria are met.
Args:
env: The environment instance to check
success_term: The success termination object or None if not available
success_step_count: Current count of consecutive successful steps
Returns:
tuple[int, bool]: A tuple containing:
- updated success_step_count: The updated count of consecutive successful steps
- success_reset_needed: Boolean indicating if reset is needed due to success
"""
if success_term is None:
return success_step_count, False
if bool(success_term.func(env, **success_term.params)[0]):
success_step_count += 1
if success_step_count >= args_cli.num_success_steps:
env.recorder_manager.record_pre_reset([0], force_export_or_skip=False)
env.recorder_manager.set_success_to_episodes(
[0], torch.tensor([[True]], dtype=torch.bool, device=env.device)
)
env.recorder_manager.export_episodes([0])
omni.log.info("Success condition met! Recording completed.")
return success_step_count, True
else:
success_step_count = 0
return success_step_count, False
def handle_reset(
env: gym.Env, success_step_count: int, instruction_display: InstructionDisplay, label_text: str
) -> int:
"""Handle resetting the environment.
Resets the environment, recorder manager, and related state variables.
Updates the instruction display with current status.
Args:
env: The environment instance to reset
success_step_count: Current count of consecutive successful steps
instruction_display: The display object to update
label_text: Text to display showing current recording status
Returns:
int: Reset success step count (0)
"""
omni.log.info("Resetting environment...")
env.sim.reset()
env.recorder_manager.reset()
env.reset()
success_step_count = 0
instruction_display.show_demo(label_text)
return success_step_count
def run_simulation_loop(
env: gym.Env,
teleop_interface: object | None,
success_term: object | None,
rate_limiter: RateLimiter | None,
) -> int:
"""Run the main simulation loop for collecting demonstrations.
Sets up callback functions for teleop device, initializes the UI,
and runs the main loop that processes user inputs and environment steps.
Records demonstrations when success conditions are met.
Args:
env: The environment instance
teleop_interface: Optional teleop interface (will be created if None)
success_term: The success termination object or None if not available
rate_limiter: Optional rate limiter to control simulation speed
Returns:
int: Number of successful demonstrations recorded
"""
current_recorded_demo_count = 0
success_step_count = 0
should_reset_recording_instance = False
running_recording_instance = not args_cli.xr
# Callback closures for the teleop device
def reset_recording_instance():
nonlocal should_reset_recording_instance
should_reset_recording_instance = True
omni.log.info("Recording instance reset requested")
def start_recording_instance():
nonlocal running_recording_instance
running_recording_instance = True
omni.log.info("Recording started")
def stop_recording_instance():
nonlocal running_recording_instance
running_recording_instance = False
omni.log.info("Recording paused")
# Set up teleoperation callbacks
teleoperation_callbacks = {
"R": reset_recording_instance,
"START": start_recording_instance,
"STOP": stop_recording_instance,
"RESET": reset_recording_instance,
}
teleop_interface = setup_teleop_device(teleoperation_callbacks)
teleop_interface.add_callback("R", reset_recording_instance)
# Reset before starting
env.sim.reset()
env.reset()
teleop_interface.reset()
label_text = f"Recorded {current_recorded_demo_count} successful demonstrations."
instruction_display = setup_ui(label_text, env)
subtasks = {}
with contextlib.suppress(KeyboardInterrupt) and torch.inference_mode():
while simulation_app.is_running():
# get data from teleop device
teleop_data = teleop_interface.advance()
# Get keyboard command
action = teleop_interface.advance()
# Expand to batch dimension
actions = action.repeat(env.num_envs, 1)
# perform action on environment
# Perform action on environment
if running_recording_instance:
# compute actions based on environment
actions = pre_process_actions(teleop_data, env.num_envs, env.device)
# Compute actions based on environment
obv = env.step(actions)
if subtasks is not None:
if subtasks == {}:
......@@ -414,45 +478,74 @@ def main():
else:
env.sim.render()
if success_term is not None:
if bool(success_term.func(env, **success_term.params)[0]):
success_step_count += 1
if success_step_count >= args_cli.num_success_steps:
env.recorder_manager.record_pre_reset([0], force_export_or_skip=False)
env.recorder_manager.set_success_to_episodes(
[0], torch.tensor([[True]], dtype=torch.bool, device=env.device)
)
env.recorder_manager.export_episodes([0])
should_reset_recording_instance = True
else:
success_step_count = 0
# print out the current demo count if it has changed
# Check for success condition
success_step_count, success_reset_needed = process_success_condition(env, success_term, success_step_count)
if success_reset_needed:
should_reset_recording_instance = True
# Update demo count if it has changed
if env.recorder_manager.exported_successful_episode_count > current_recorded_demo_count:
current_recorded_demo_count = env.recorder_manager.exported_successful_episode_count
label_text = f"Recorded {current_recorded_demo_count} successful demonstrations."
print(label_text)
omni.log.info(label_text)
# Handle reset if requested
if should_reset_recording_instance:
env.sim.reset()
env.recorder_manager.reset()
env.reset()
success_step_count = handle_reset(env, success_step_count, instruction_display, label_text)
should_reset_recording_instance = False
success_step_count = 0
instruction_display.show_demo(label_text)
# Check if we've reached the desired number of demos
if args_cli.num_demos > 0 and env.recorder_manager.exported_successful_episode_count >= args_cli.num_demos:
print(f"All {args_cli.num_demos} demonstrations recorded. Exiting the app.")
omni.log.info(f"All {args_cli.num_demos} demonstrations recorded. Exiting the app.")
break
# check that simulation is stopped or not
# Check if simulation is stopped
if env.sim.is_stopped():
break
# Rate limiting
if rate_limiter:
rate_limiter.sleep(env)
return current_recorded_demo_count
def main() -> None:
"""Collect demonstrations from the environment using teleop interfaces.
Main function that orchestrates the entire process:
1. Sets up rate limiting based on configuration
2. Creates output directories for saving demonstrations
3. Configures the environment
4. Runs the simulation loop to collect demonstrations
5. Cleans up resources when done
Raises:
Exception: Propagates exceptions from any of the called functions
"""
# if handtracking is selected, rate limiting is achieved via OpenXR
if args_cli.xr:
rate_limiter = None
else:
rate_limiter = RateLimiter(args_cli.step_hz)
# Set up output directories
output_dir, output_file_name = setup_output_directories()
# Create and configure environment
global env_cfg # Make env_cfg available to setup_teleop_device
env_cfg, success_term = create_environment_config(output_dir, output_file_name)
# Create environment
env = create_environment(env_cfg)
# Run simulation loop
current_recorded_demo_count = run_simulation_loop(env, None, success_term, rate_limiter)
# Clean up
env.close()
omni.log.info(f"Recording session completed with {current_recorded_demo_count} successful demonstrations")
omni.log.info(f"Demonstrations saved to: {args_cli.dataset_file}")
if __name__ == "__main__":
......
......@@ -61,7 +61,7 @@ import gymnasium as gym
import os
import torch
from isaaclab.devices import Se3Keyboard
from isaaclab.devices import Se3Keyboard, Se3KeyboardCfg
from isaaclab.utils.datasets import EpisodeData, HDF5DatasetFileHandler
if args_cli.enable_pinocchio:
......@@ -149,7 +149,7 @@ def main():
# create environment from loaded config
env = gym.make(args_cli.task, cfg=env_cfg).unwrapped
teleop_interface = Se3Keyboard(pos_sensitivity=0.1, rot_sensitivity=0.1)
teleop_interface = Se3Keyboard(Se3KeyboardCfg(pos_sensitivity=0.1, rot_sensitivity=0.1))
teleop_interface.add_callback("N", play_cb)
teleop_interface.add_callback("B", pause_cb)
print('Press "B" to pause and "N" to resume the replayed actions.')
......
......@@ -376,7 +376,7 @@ Added
~~~~~~~~~~~~~~~~~~~
Added
~~~~~
^^^^^
* Added the :meth:`~isaaclab.env.mdp.observations.joint_effort`
......@@ -391,6 +391,17 @@ Added
* Added :meth:`~isaaclab.envs.mdp.observations.body_projected_gravity_b`
0.37.5 (2025-05-12)
~~~~~~~~~~~~~~~~~~~
Added
^^^^^
* Added a new teleop configuration class :class:`~isaaclab.devices.DevicesCfg` to support multiple teleoperation
devices declared in the environment configuration file.
* Implemented a factory function to create teleoperation devices based on the device configuration.
0.37.4 (2025-05-12)
~~~~~~~~~~~~~~~~~~~~
......
......@@ -590,7 +590,7 @@ class AppLauncher:
def _resolve_xr_settings(self, launcher_args: dict):
"""Resolve XR related settings."""
xr_env = int(os.environ.get("XR", 0))
xr_arg = launcher_args.pop("xr", AppLauncher._APPLAUNCHER_CFG_INFO["xr"][1])
xr_arg = launcher_args.get("xr", AppLauncher._APPLAUNCHER_CFG_INFO["xr"][1])
xr_valid_vals = {0, 1}
if xr_env not in xr_valid_vals:
raise ValueError(f"Invalid value for environment variable `XR`: {xr_env} .Expected: {xr_valid_vals} .")
......
......@@ -19,9 +19,10 @@ to add user-defined callback functions to be called when a particular input is p
the peripheral device.
"""
from .device_base import DeviceBase
from .gamepad import Se2Gamepad, Se3Gamepad
from .keyboard import Se2Keyboard, Se3Keyboard
from .openxr import OpenXRDevice
from .retargeter_base import RetargeterBase
from .spacemouse import Se2SpaceMouse, Se3SpaceMouse
from .device_base import DeviceBase, DeviceCfg, DevicesCfg
from .gamepad import Se2Gamepad, Se2GamepadCfg, Se3Gamepad, Se3GamepadCfg
from .keyboard import Se2Keyboard, Se2KeyboardCfg, Se3Keyboard, Se3KeyboardCfg
from .openxr import OpenXRDevice, OpenXRDeviceCfg
from .retargeter_base import RetargeterBase, RetargeterCfg
from .spacemouse import Se2SpaceMouse, Se2SpaceMouseCfg, Se3SpaceMouse, Se3SpaceMouseCfg
from .teleop_device_factory import create_teleop_device
......@@ -5,11 +5,28 @@
"""Base class for teleoperation interface."""
import torch
from abc import ABC, abstractmethod
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import Any
from isaaclab.devices.retargeter_base import RetargeterBase
from isaaclab.devices.retargeter_base import RetargeterBase, RetargeterCfg
@dataclass
class DeviceCfg:
"""Configuration for teleoperation devices."""
sim_device: str = "cpu"
retargeters: list[RetargeterCfg] = field(default_factory=list)
@dataclass
class DevicesCfg:
"""Configuration for all supported teleoperation devices."""
devices: dict[str, DeviceCfg] = field(default_factory=dict)
class DeviceBase(ABC):
......@@ -76,7 +93,7 @@ class DeviceBase(ABC):
"""
raise NotImplementedError("Derived class must implement _get_raw_data() or override advance()")
def advance(self) -> Any:
def advance(self) -> torch.Tensor:
"""Process current device state and return control commands.
This method retrieves raw data from the device and optionally applies
......@@ -87,8 +104,9 @@ class DeviceBase(ABC):
2. Override this method completely for custom command processing
Returns:
Raw device data if no retargeters are configured.
When retargeters are configured, returns a tuple containing each retargeter's processed output.
When no retargeters are configured, returns raw device data in its native format.
When retargeters are configured, returns a torch.Tensor containing the concatenated
outputs from all retargeters.
"""
raw_data = self._get_raw_data()
......@@ -97,4 +115,5 @@ class DeviceBase(ABC):
return raw_data
# With multiple retargeters, return a tuple of outputs
return tuple(retargeter.retarget(raw_data) for retargeter in self._retargeters)
# Concatenate retargeted outputs into a single tensor
return torch.cat([retargeter.retarget(raw_data) for retargeter in self._retargeters], dim=-1)
......@@ -5,5 +5,5 @@
"""Gamepad device for SE(2) and SE(3) control."""
from .se2_gamepad import Se2Gamepad
from .se3_gamepad import Se3Gamepad
from .se2_gamepad import Se2Gamepad, Se2GamepadCfg
from .se3_gamepad import Se3Gamepad, Se3GamepadCfg
......@@ -6,14 +6,26 @@
"""Gamepad controller for SE(2) control."""
import numpy as np
import torch
import weakref
from collections.abc import Callable
from dataclasses import dataclass
import carb
import carb.input
import omni
from ..device_base import DeviceBase
from ..device_base import DeviceBase, DeviceCfg
@dataclass
class Se2GamepadCfg(DeviceCfg):
"""Configuration for SE2 gamepad devices."""
v_x_sensitivity: float = 1.0
v_y_sensitivity: float = 1.0
omega_z_sensitivity: float = 1.0
dead_zone: float = 0.01
class Se2Gamepad(DeviceBase):
......@@ -42,10 +54,7 @@ class Se2Gamepad(DeviceBase):
def __init__(
self,
v_x_sensitivity: float = 1.0,
v_y_sensitivity: float = 1.0,
omega_z_sensitivity: float = 1.0,
dead_zone: float = 0.01,
cfg: Se2GamepadCfg,
):
"""Initialize the gamepad layer.
......@@ -60,10 +69,11 @@ class Se2Gamepad(DeviceBase):
carb_settings_iface = carb.settings.get_settings()
carb_settings_iface.set_bool("/persistent/app/omniverse/gamepadCameraControl", False)
# store inputs
self.v_x_sensitivity = v_x_sensitivity
self.v_y_sensitivity = v_y_sensitivity
self.omega_z_sensitivity = omega_z_sensitivity
self.dead_zone = dead_zone
self.v_x_sensitivity = cfg.v_x_sensitivity
self.v_y_sensitivity = cfg.v_y_sensitivity
self.omega_z_sensitivity = cfg.omega_z_sensitivity
self.dead_zone = cfg.dead_zone
self._sim_device = cfg.sim_device
# acquire omniverse interfaces
self._appwindow = omni.appwindow.get_default_app_window()
self._input = carb.input.acquire_input_interface()
......@@ -121,13 +131,14 @@ class Se2Gamepad(DeviceBase):
"""
self._additional_callbacks[key] = func
def advance(self) -> np.ndarray:
def advance(self) -> torch.Tensor:
"""Provides the result from gamepad event state.
Returns:
A 3D array containing the linear (x,y) and angular velocity (z).
A tensor containing the linear (x,y) and angular velocity (z).
"""
return self._resolve_command_buffer(self._base_command_raw)
numpy_result = self._resolve_command_buffer(self._base_command_raw)
return torch.tensor(numpy_result, dtype=torch.float32, device=self._sim_device)
"""
Internal helpers.
......
......@@ -6,14 +6,26 @@
"""Gamepad controller for SE(3) control."""
import numpy as np
import torch
import weakref
from collections.abc import Callable
from dataclasses import dataclass
from scipy.spatial.transform import Rotation
import carb
import omni
from ..device_base import DeviceBase
from ..device_base import DeviceBase, DeviceCfg
@dataclass
class Se3GamepadCfg(DeviceCfg):
"""Configuration for SE3 gamepad devices."""
dead_zone: float = 0.01 # For gamepad devices
pos_sensitivity: float = 1.0
rot_sensitivity: float = 1.6
retargeters: None = None
class Se3Gamepad(DeviceBase):
......@@ -47,22 +59,23 @@ class Se3Gamepad(DeviceBase):
"""
def __init__(self, pos_sensitivity: float = 1.0, rot_sensitivity: float = 1.6, dead_zone: float = 0.01):
def __init__(
self,
cfg: Se3GamepadCfg,
):
"""Initialize the gamepad layer.
Args:
pos_sensitivity: Magnitude of input position command scaling. Defaults to 1.0.
rot_sensitivity: Magnitude of scale input rotation commands scaling. Defaults to 1.6.
dead_zone: Magnitude of dead zone for gamepad. An event value from the gamepad less than
this value will be ignored. Defaults to 0.01.
cfg: Configuration object for gamepad settings.
"""
# turn off simulator gamepad control
carb_settings_iface = carb.settings.get_settings()
carb_settings_iface.set_bool("/persistent/app/omniverse/gamepadCameraControl", False)
# store inputs
self.pos_sensitivity = pos_sensitivity
self.rot_sensitivity = rot_sensitivity
self.dead_zone = dead_zone
self.pos_sensitivity = cfg.pos_sensitivity
self.rot_sensitivity = cfg.rot_sensitivity
self.dead_zone = cfg.dead_zone
self._sim_device = cfg.sim_device
# acquire omniverse interfaces
self._appwindow = omni.appwindow.get_default_app_window()
self._input = carb.input.acquire_input_interface()
......@@ -127,11 +140,13 @@ class Se3Gamepad(DeviceBase):
"""
self._additional_callbacks[key] = func
def advance(self) -> tuple[np.ndarray, bool]:
def advance(self) -> torch.Tensor:
"""Provides the result from gamepad event state.
Returns:
A tuple containing the delta pose command and gripper commands.
torch.Tensor: A 7-element tensor containing:
- delta pose: First 6 elements as [x, y, z, rx, ry, rz] in meters and radians.
- gripper command: Last element as a binary value (+1.0 for open, -1.0 for close).
"""
# -- resolve position command
delta_pos = self._resolve_command_buffer(self._delta_pose_raw[:, :3])
......@@ -140,7 +155,10 @@ class Se3Gamepad(DeviceBase):
# -- convert to rotation vector
rot_vec = Rotation.from_euler("XYZ", delta_rot).as_rotvec()
# return the command and gripper state
return np.concatenate([delta_pos, rot_vec]), self._close_gripper
gripper_value = -1.0 if self._close_gripper else 1.0
delta_pose = np.concatenate([delta_pos, rot_vec])
command = np.append(delta_pose, gripper_value)
return torch.tensor(command, dtype=torch.float32, device=self._sim_device)
"""
Internal helpers.
......
......@@ -5,5 +5,5 @@
"""Keyboard device for SE(2) and SE(3) control."""
from .se2_keyboard import Se2Keyboard
from .se3_keyboard import Se3Keyboard
from .se2_keyboard import Se2Keyboard, Se2KeyboardCfg
from .se3_keyboard import Se3Keyboard, Se3KeyboardCfg
......@@ -6,13 +6,24 @@
"""Keyboard controller for SE(2) control."""
import numpy as np
import torch
import weakref
from collections.abc import Callable
from dataclasses import dataclass
import carb
import omni
from ..device_base import DeviceBase
from ..device_base import DeviceBase, DeviceCfg
@dataclass
class Se2KeyboardCfg(DeviceCfg):
"""Configuration for SE2 keyboard devices."""
v_x_sensitivity: float = 0.8
v_y_sensitivity: float = 0.4
omega_z_sensitivity: float = 1.0
class Se2Keyboard(DeviceBase):
......@@ -39,7 +50,7 @@ class Se2Keyboard(DeviceBase):
"""
def __init__(self, v_x_sensitivity: float = 0.8, v_y_sensitivity: float = 0.4, omega_z_sensitivity: float = 1.0):
def __init__(self, cfg: Se2KeyboardCfg):
"""Initialize the keyboard layer.
Args:
......@@ -48,9 +59,11 @@ class Se2Keyboard(DeviceBase):
omega_z_sensitivity: Magnitude of angular velocity along z-direction scaling. Defaults to 1.0.
"""
# store inputs
self.v_x_sensitivity = v_x_sensitivity
self.v_y_sensitivity = v_y_sensitivity
self.omega_z_sensitivity = omega_z_sensitivity
self.v_x_sensitivity = cfg.v_x_sensitivity
self.v_y_sensitivity = cfg.v_y_sensitivity
self.omega_z_sensitivity = cfg.omega_z_sensitivity
self._sim_device = cfg.sim_device
# acquire omniverse interfaces
self._appwindow = omni.appwindow.get_default_app_window()
self._input = carb.input.acquire_input_interface()
......@@ -107,13 +120,13 @@ class Se2Keyboard(DeviceBase):
"""
self._additional_callbacks[key] = func
def advance(self) -> np.ndarray:
def advance(self) -> torch.Tensor:
"""Provides the result from keyboard event state.
Returns:
3D array containing the linear (x,y) and angular velocity (z).
Tensor containing the linear (x,y) and angular velocity (z).
"""
return self._base_command
return torch.tensor(self._base_command, dtype=torch.float32, device=self._sim_device)
"""
Internal helpers.
......
......@@ -6,14 +6,25 @@
"""Keyboard controller for SE(3) control."""
import numpy as np
import torch
import weakref
from collections.abc import Callable
from dataclasses import dataclass
from scipy.spatial.transform import Rotation
import carb
import omni
from ..device_base import DeviceBase
from ..device_base import DeviceBase, DeviceCfg
@dataclass
class Se3KeyboardCfg(DeviceCfg):
"""Configuration for SE3 keyboard devices."""
pos_sensitivity: float = 0.4
rot_sensitivity: float = 0.8
retargeters: None = None
class Se3Keyboard(DeviceBase):
......@@ -47,16 +58,16 @@ class Se3Keyboard(DeviceBase):
"""
def __init__(self, pos_sensitivity: float = 0.4, rot_sensitivity: float = 0.8):
def __init__(self, cfg: Se3KeyboardCfg):
"""Initialize the keyboard layer.
Args:
pos_sensitivity: Magnitude of input position command scaling. Defaults to 0.05.
rot_sensitivity: Magnitude of scale input rotation commands scaling. Defaults to 0.5.
cfg: Configuration object for keyboard settings.
"""
# store inputs
self.pos_sensitivity = pos_sensitivity
self.rot_sensitivity = rot_sensitivity
self.pos_sensitivity = cfg.pos_sensitivity
self.rot_sensitivity = cfg.rot_sensitivity
self._sim_device = cfg.sim_device
# acquire omniverse interfaces
self._appwindow = omni.appwindow.get_default_app_window()
self._input = carb.input.acquire_input_interface()
......@@ -117,16 +128,21 @@ class Se3Keyboard(DeviceBase):
"""
self._additional_callbacks[key] = func
def advance(self) -> tuple[np.ndarray, bool]:
def advance(self) -> torch.Tensor:
"""Provides the result from keyboard event state.
Returns:
A tuple containing the delta pose command and gripper commands.
torch.Tensor: A 7-element tensor containing:
- delta pose: First 6 elements as [x, y, z, rx, ry, rz] in meters and radians.
- gripper command: Last element as a binary value (+1.0 for open, -1.0 for close).
"""
# convert to rotation vector
rot_vec = Rotation.from_euler("XYZ", self._delta_rot).as_rotvec()
# return the command and gripper state
return np.concatenate([self._delta_pos, rot_vec]), self._close_gripper
gripper_value = -1.0 if self._close_gripper else 1.0
delta_pose = np.concatenate([self._delta_pos, rot_vec])
command = np.append(delta_pose, gripper_value)
return torch.tensor(command, dtype=torch.float32, device=self._sim_device)
"""
Internal helpers.
......
......@@ -5,5 +5,5 @@
"""Keyboard device for SE(2) and SE(3) control."""
from .openxr_device import OpenXRDevice
from .openxr_device import OpenXRDevice, OpenXRDeviceCfg
from .xr_cfg import XrCfg
......@@ -8,6 +8,7 @@
import contextlib
import numpy as np
from collections.abc import Callable
from dataclasses import dataclass
from enum import Enum
from typing import Any
......@@ -16,24 +17,37 @@ import carb
from isaaclab.devices.openxr.common import HAND_JOINT_NAMES
from isaaclab.devices.retargeter_base import RetargeterBase
from ..device_base import DeviceBase
from ..device_base import DeviceBase, DeviceCfg
from .xr_cfg import XrCfg
# For testing purposes, we need to mock the XRCore, XRPoseValidityFlags classes
XRCore = None
XRPoseValidityFlags = None
with contextlib.suppress(ModuleNotFoundError):
from omni.kit.xr.core import XRCore, XRPoseValidityFlags
from isaacsim.core.prims import SingleXFormPrim
@dataclass
class OpenXRDeviceCfg(DeviceCfg):
"""Configuration for OpenXR devices."""
xr_cfg: XrCfg | None = None
class OpenXRDevice(DeviceBase):
"""An OpenXR-powered device for teleoperation and interaction.
This device tracks hand joints using OpenXR and makes them available as:
1. A dictionary of joint poses (when used directly)
2. Retargeted commands for robot control (when a retargeter is provided)
Data format:
* Joint poses: Each pose is a 7D vector (x, y, z, qw, qx, qy, qz) in meters and quaternion units
* Dictionary keys: Joint names from HAND_JOINT_NAMES in isaaclab.devices.openxr.common
1. A dictionary of tracking data (when used without retargeters)
2. Retargeted commands for robot control (when retargeters are provided)
Raw data format (_get_raw_data output):
* A dictionary with keys matching TrackingTarget enum values (HAND_LEFT, HAND_RIGHT, HEAD)
* Each hand tracking entry contains a dictionary of joint poses
* Each joint pose is a 7D vector (x, y, z, qw, qx, qy, qz) in meters and quaternion units
* Joint names are defined in HAND_JOINT_NAMES from isaaclab.devices.openxr.common
* Supported joints include palm, wrist, and joints for thumb, index, middle, ring and little fingers
Teleop commands:
......@@ -42,7 +56,7 @@ class OpenXRDevice(DeviceBase):
* "STOP": Pause hand tracking data flow
* "RESET": Reset the tracking and signal simulation reset
The device can track the left hand, right hand, head position, or any combination of these
The device tracks the left hand, right hand, head position, or any combination of these
based on the TrackingTarget enum values. When retargeters are provided, the raw tracking
data is transformed into robot control commands suitable for teleoperation.
"""
......@@ -64,18 +78,17 @@ class OpenXRDevice(DeviceBase):
def __init__(
self,
xr_cfg: XrCfg | None,
cfg: OpenXRDeviceCfg,
retargeters: list[RetargeterBase] | None = None,
):
"""Initialize the OpenXR device.
Args:
xr_cfg: Configuration object for OpenXR settings. If None, default settings are used.
retargeters: List of retargeters to transform tracking data into robot commands.
If None or empty list, raw tracking data will be returned.
cfg: Configuration object for OpenXR settings.
retargeters: List of retargeter instances to use for transforming raw tracking data.
"""
super().__init__(retargeters)
self._xr_cfg = xr_cfg or XrCfg()
self._xr_cfg = cfg.xr_cfg or XrCfg()
self._additional_callbacks = dict()
self._vc_subscription = (
XRCore.get_singleton()
......@@ -91,7 +104,6 @@ class OpenXRDevice(DeviceBase):
self._previous_joint_poses_right = {name: default_pose.copy() for name in HAND_JOINT_NAMES}
self._previous_headpose = default_pose.copy()
# Specify the placement of the simulation when viewed in an XR device using a prim.
xr_anchor = SingleXFormPrim("/XRAnchor", position=self._xr_cfg.anchor_pos, orientation=self._xr_cfg.anchor_rot)
carb.settings.get_settings().set_float("/persistent/xr/profile/ar/render/nearPlane", self._xr_cfg.near_plane)
carb.settings.get_settings().set_string("/persistent/xr/profile/ar/anchorMode", "custom anchor")
......@@ -176,10 +188,10 @@ class OpenXRDevice(DeviceBase):
"""Get the latest tracking data from the OpenXR runtime.
Returns:
Dictionary containing tracking data for:
- Left hand joint poses (26 joints with position and orientation)
- Right hand joint poses (26 joints with position and orientation)
- Head pose (position and orientation)
Dictionary with TrackingTarget enum keys (HAND_LEFT, HAND_RIGHT, HEAD) containing:
- Left hand joint poses: Dictionary of 26 joints with position and orientation
- Right hand joint poses: Dictionary of 26 joints with position and orientation
- Head pose: Single 7-element array with position and orientation
Each pose is represented as a 7-element array: [x, y, z, qw, qx, qy, qz]
where the first 3 elements are position and the last 4 are quaternion orientation.
......
......@@ -4,7 +4,7 @@
# SPDX-License-Identifier: BSD-3-Clause
"""Retargeters for mapping input device data to robot commands."""
from .humanoid.fourier.gr1t2_retargeter import GR1T2Retargeter
from .manipulator.gripper_retargeter import GripperRetargeter
from .manipulator.se3_abs_retargeter import Se3AbsRetargeter
from .manipulator.se3_rel_retargeter import Se3RelRetargeter
from .humanoid.fourier.gr1t2_retargeter import GR1T2Retargeter, GR1T2RetargeterCfg
from .manipulator.gripper_retargeter import GripperRetargeter, GripperRetargeterCfg
from .manipulator.se3_abs_retargeter import Se3AbsRetargeter, Se3AbsRetargeterCfg
from .manipulator.se3_rel_retargeter import Se3RelRetargeter, Se3RelRetargeterCfg
......@@ -11,11 +11,12 @@
import contextlib
import numpy as np
import torch
from dataclasses import dataclass
import isaaclab.sim as sim_utils
import isaaclab.utils.math as PoseUtils
from isaaclab.devices import OpenXRDevice
from isaaclab.devices.retargeter_base import RetargeterBase
from isaaclab.devices.retargeter_base import RetargeterBase, RetargeterCfg
from isaaclab.markers import VisualizationMarkers, VisualizationMarkersCfg
# This import exception is suppressed because gr1_t2_dex_retargeting_utils depends on pinocchio which is not available on windows
......@@ -23,6 +24,15 @@ with contextlib.suppress(Exception):
from .gr1_t2_dex_retargeting_utils import GR1TR2DexRetargeting
@dataclass
class GR1T2RetargeterCfg(RetargeterCfg):
"""Configuration for the GR1T2 retargeter."""
enable_visualization: bool = False
num_open_xr_hand_joints: int = 100
hand_joint_names: list[str] | None = None # List of robot hand joint names
class GR1T2Retargeter(RetargeterBase):
"""Retargets OpenXR hand tracking data to GR1T2 hand end-effector commands.
......@@ -32,10 +42,7 @@ class GR1T2Retargeter(RetargeterBase):
def __init__(
self,
enable_visualization: bool = False,
num_open_xr_hand_joints: int = 100,
device: torch.device = torch.device("cuda:0"),
hand_joint_names: list[str] = [],
cfg: GR1T2RetargeterCfg,
):
"""Initialize the GR1T2 hand retargeter.
......@@ -46,13 +53,13 @@ class GR1T2Retargeter(RetargeterBase):
hand_joint_names: List of robot hand joint names
"""
self._hand_joint_names = hand_joint_names
self._hand_joint_names = cfg.hand_joint_names
self._hands_controller = GR1TR2DexRetargeting(self._hand_joint_names)
# Initialize visualization if enabled
self._enable_visualization = enable_visualization
self._num_open_xr_hand_joints = num_open_xr_hand_joints
self._device = device
self._enable_visualization = cfg.enable_visualization
self._num_open_xr_hand_joints = cfg.num_open_xr_hand_joints
self._sim_device = cfg.sim_device
if self._enable_visualization:
marker_cfg = VisualizationMarkersCfg(
prim_path="/Visuals/markers",
......@@ -65,7 +72,7 @@ class GR1T2Retargeter(RetargeterBase):
)
self._markers = VisualizationMarkers(marker_cfg)
def retarget(self, data: dict) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
def retarget(self, data: dict) -> torch.Tensor:
"""Convert hand joint poses to robot end-effector commands.
Args:
......@@ -91,7 +98,7 @@ class GR1T2Retargeter(RetargeterBase):
joints_position[::2] = np.array([pose[:3] for pose in left_hand_poses.values()])
joints_position[1::2] = np.array([pose[:3] for pose in right_hand_poses.values()])
self._markers.visualize(translations=torch.tensor(joints_position, device=self._device))
self._markers.visualize(translations=torch.tensor(joints_position, device=self._sim_device))
# Create array of zeros with length matching number of joint names
left_hands_pos = self._hands_controller.compute_left(left_hand_poses)
......@@ -107,7 +114,13 @@ class GR1T2Retargeter(RetargeterBase):
right_hand_joints = right_retargeted_hand_joints
retargeted_hand_joints = left_hand_joints + right_hand_joints
return left_wrist, self._retarget_abs(right_wrist), retargeted_hand_joints
# Convert numpy arrays to tensors and concatenate them
left_wrist_tensor = torch.tensor(left_wrist, dtype=torch.float32, device=self._sim_device)
right_wrist_tensor = torch.tensor(self._retarget_abs(right_wrist), dtype=torch.float32, device=self._sim_device)
hand_joints_tensor = torch.tensor(retargeted_hand_joints, dtype=torch.float32, device=self._sim_device)
# Combine all tensors into a single tensor
return torch.cat([left_wrist_tensor, right_wrist_tensor, hand_joints_tensor])
def _retarget_abs(self, wrist: np.ndarray) -> np.ndarray:
"""Handle absolute pose retargeting.
......
......@@ -8,6 +8,6 @@
This module provides functionality for retargeting motion to Franka robots.
"""
from .gripper_retargeter import GripperRetargeter
from .se3_abs_retargeter import Se3AbsRetargeter
from .se3_rel_retargeter import Se3RelRetargeter
from .gripper_retargeter import GripperRetargeter, GripperRetargeterCfg
from .se3_abs_retargeter import Se3AbsRetargeter, Se3AbsRetargeterCfg
from .se3_rel_retargeter import Se3RelRetargeter, Se3RelRetargeterCfg
......@@ -3,10 +3,19 @@
#
# SPDX-License-Identifier: BSD-3-Clause
import numpy as np
import torch
from dataclasses import dataclass
from typing import Final
from isaaclab.devices import OpenXRDevice
from isaaclab.devices.retargeter_base import RetargeterBase
from isaaclab.devices.retargeter_base import RetargeterBase, RetargeterCfg
@dataclass
class GripperRetargeterCfg(RetargeterCfg):
"""Configuration for gripper retargeter."""
bound_hand: OpenXRDevice.TrackingTarget = OpenXRDevice.TrackingTarget.HAND_RIGHT
class GripperRetargeter(RetargeterBase):
......@@ -27,20 +36,21 @@ class GripperRetargeter(RetargeterBase):
def __init__(
self,
bound_hand: OpenXRDevice.TrackingTarget,
cfg: GripperRetargeterCfg,
):
super().__init__(cfg)
"""Initialize the gripper retargeter."""
# Store the hand to track
if bound_hand not in [OpenXRDevice.TrackingTarget.HAND_LEFT, OpenXRDevice.TrackingTarget.HAND_RIGHT]:
if cfg.bound_hand not in [OpenXRDevice.TrackingTarget.HAND_LEFT, OpenXRDevice.TrackingTarget.HAND_RIGHT]:
raise ValueError(
"bound_hand must be either OpenXRDevice.TrackingTarget.HAND_LEFT or"
" OpenXRDevice.TrackingTarget.HAND_RIGHT"
)
self.bound_hand = bound_hand
self.bound_hand = cfg.bound_hand
# Initialize gripper state
self._previous_gripper_command = False
def retarget(self, data: dict) -> bool:
def retarget(self, data: dict) -> torch.Tensor:
"""Convert hand joint poses to gripper command.
Args:
......@@ -48,7 +58,7 @@ class GripperRetargeter(RetargeterBase):
The joint names are defined in isaaclab.devices.openxr.common.HAND_JOINT_NAMES
Returns:
bool: Gripper command where True = close gripper, False = open gripper
torch.Tensor: Tensor containing a single bool value where True = close gripper, False = open gripper
"""
# Extract key joint poses
hand_data = data[self.bound_hand]
......@@ -56,8 +66,10 @@ class GripperRetargeter(RetargeterBase):
index_tip = hand_data["index_tip"]
# Calculate gripper command with hysteresis
gripper_command = self._calculate_gripper_command(thumb_tip[:3], index_tip[:3])
return gripper_command
gripper_command_bool = self._calculate_gripper_command(thumb_tip[:3], index_tip[:3])
gripper_value = -1.0 if gripper_command_bool else 1.0
return torch.tensor([gripper_value], dtype=torch.float32, device=self._sim_device)
def _calculate_gripper_command(self, thumb_pos: np.ndarray, index_pos: np.ndarray) -> bool:
"""Calculate gripper command from finger positions with hysteresis.
......
......@@ -3,14 +3,27 @@
#
# SPDX-License-Identifier: BSD-3-Clause
import numpy as np
import torch
from dataclasses import dataclass
from scipy.spatial.transform import Rotation, Slerp
from isaaclab.devices import OpenXRDevice
from isaaclab.devices.retargeter_base import RetargeterBase
from isaaclab.devices.retargeter_base import RetargeterBase, RetargeterCfg
from isaaclab.markers import VisualizationMarkers
from isaaclab.markers.config import FRAME_MARKER_CFG
@dataclass
class Se3AbsRetargeterCfg(RetargeterCfg):
"""Configuration for absolute position retargeter."""
zero_out_xy_rotation: bool = True
use_wrist_rotation: bool = False
use_wrist_position: bool = True
enable_visualization: bool = False
bound_hand: OpenXRDevice.TrackingTarget = OpenXRDevice.TrackingTarget.HAND_RIGHT
class Se3AbsRetargeter(RetargeterBase):
"""Retargets OpenXR hand tracking data to end-effector commands using absolute positioning.
......@@ -26,11 +39,7 @@ class Se3AbsRetargeter(RetargeterBase):
def __init__(
self,
bound_hand: OpenXRDevice.TrackingTarget,
zero_out_xy_rotation: bool = False,
use_wrist_rotation: bool = False,
use_wrist_position: bool = False,
enable_visualization: bool = False,
cfg: Se3AbsRetargeterCfg,
):
"""Initialize the retargeter.
......@@ -40,21 +49,23 @@ class Se3AbsRetargeter(RetargeterBase):
use_wrist_rotation: If True, use wrist rotation instead of finger average
use_wrist_position: If True, use wrist position instead of pinch position
enable_visualization: If True, visualize the target pose in the scene
device: The device to place the returned tensor on ('cpu' or 'cuda')
"""
if bound_hand not in [OpenXRDevice.TrackingTarget.HAND_LEFT, OpenXRDevice.TrackingTarget.HAND_RIGHT]:
super().__init__(cfg)
if cfg.bound_hand not in [OpenXRDevice.TrackingTarget.HAND_LEFT, OpenXRDevice.TrackingTarget.HAND_RIGHT]:
raise ValueError(
"bound_hand must be either OpenXRDevice.TrackingTarget.HAND_LEFT or"
" OpenXRDevice.TrackingTarget.HAND_RIGHT"
)
self.bound_hand = bound_hand
self.bound_hand = cfg.bound_hand
self._zero_out_xy_rotation = zero_out_xy_rotation
self._use_wrist_rotation = use_wrist_rotation
self._use_wrist_position = use_wrist_position
self._zero_out_xy_rotation = cfg.zero_out_xy_rotation
self._use_wrist_rotation = cfg.use_wrist_rotation
self._use_wrist_position = cfg.use_wrist_position
# Initialize visualization if enabled
self._enable_visualization = enable_visualization
if enable_visualization:
self._enable_visualization = cfg.enable_visualization
if cfg.enable_visualization:
frame_marker_cfg = FRAME_MARKER_CFG.copy()
frame_marker_cfg.markers["frame"].scale = (0.1, 0.1, 0.1)
self._goal_marker = VisualizationMarkers(frame_marker_cfg.replace(prim_path="/Visuals/ee_goal"))
......@@ -62,7 +73,7 @@ class Se3AbsRetargeter(RetargeterBase):
self._visualization_pos = np.zeros(3)
self._visualization_rot = np.array([1.0, 0.0, 0.0, 0.0])
def retarget(self, data: dict) -> np.ndarray:
def retarget(self, data: dict) -> torch.Tensor:
"""Convert hand joint poses to robot end-effector command.
Args:
......@@ -70,7 +81,7 @@ class Se3AbsRetargeter(RetargeterBase):
The joint names are defined in isaaclab.devices.openxr.common.HAND_JOINT_NAMES
Returns:
np.ndarray: 7D array containing position (xyz) and orientation (quaternion)
torch.Tensor: 7D tensor containing position (xyz) and orientation (quaternion)
for the robot end-effector
"""
# Extract key joint poses from the bound hand
......@@ -79,7 +90,10 @@ class Se3AbsRetargeter(RetargeterBase):
index_tip = hand_data.get("index_tip")
wrist = hand_data.get("wrist")
ee_command = self._retarget_abs(thumb_tip, index_tip, wrist)
ee_command_np = self._retarget_abs(thumb_tip, index_tip, wrist)
# Convert to torch tensor
ee_command = torch.tensor(ee_command_np, dtype=torch.float32, device=self._sim_device)
return ee_command
......
......@@ -3,14 +3,31 @@
#
# SPDX-License-Identifier: BSD-3-Clause
import numpy as np
import torch
from dataclasses import dataclass
from scipy.spatial.transform import Rotation
from isaaclab.devices import OpenXRDevice
from isaaclab.devices.retargeter_base import RetargeterBase
from isaaclab.devices.retargeter_base import RetargeterBase, RetargeterCfg
from isaaclab.markers import VisualizationMarkers
from isaaclab.markers.config import FRAME_MARKER_CFG
@dataclass
class Se3RelRetargeterCfg(RetargeterCfg):
"""Configuration for relative position retargeter."""
zero_out_xy_rotation: bool = True
use_wrist_rotation: bool = False
use_wrist_position: bool = True
delta_pos_scale_factor: float = 10.0
delta_rot_scale_factor: float = 10.0
alpha_pos: float = 0.5
alpha_rot: float = 0.5
enable_visualization: bool = False
bound_hand: OpenXRDevice.TrackingTarget = OpenXRDevice.TrackingTarget.HAND_RIGHT
class Se3RelRetargeter(RetargeterBase):
"""Retargets OpenXR hand tracking data to end-effector commands using relative positioning.
......@@ -27,15 +44,7 @@ class Se3RelRetargeter(RetargeterBase):
def __init__(
self,
bound_hand: OpenXRDevice.TrackingTarget,
zero_out_xy_rotation: bool = False,
use_wrist_rotation: bool = False,
use_wrist_position: bool = True,
delta_pos_scale_factor: float = 10.0,
delta_rot_scale_factor: float = 10.0,
alpha_pos: float = 0.5,
alpha_rot: float = 0.5,
enable_visualization: bool = False,
cfg: Se3RelRetargeterCfg,
):
"""Initialize the relative motion retargeter.
......@@ -49,22 +58,24 @@ class Se3RelRetargeter(RetargeterBase):
alpha_pos: Position smoothing parameter (0-1); higher values track more closely to input, lower values smooth more
alpha_rot: Rotation smoothing parameter (0-1); higher values track more closely to input, lower values smooth more
enable_visualization: If True, show a visual marker representing the target end-effector pose
device: The device to place the returned tensor on ('cpu' or 'cuda')
"""
# Store the hand to track
if bound_hand not in [OpenXRDevice.TrackingTarget.HAND_LEFT, OpenXRDevice.TrackingTarget.HAND_RIGHT]:
if cfg.bound_hand not in [OpenXRDevice.TrackingTarget.HAND_LEFT, OpenXRDevice.TrackingTarget.HAND_RIGHT]:
raise ValueError(
"bound_hand must be either OpenXRDevice.TrackingTarget.HAND_LEFT or"
" OpenXRDevice.TrackingTarget.HAND_RIGHT"
)
self.bound_hand = bound_hand
super().__init__(cfg)
self.bound_hand = cfg.bound_hand
self._zero_out_xy_rotation = zero_out_xy_rotation
self._use_wrist_rotation = use_wrist_rotation
self._use_wrist_position = use_wrist_position
self._delta_pos_scale_factor = delta_pos_scale_factor
self._delta_rot_scale_factor = delta_rot_scale_factor
self._alpha_pos = alpha_pos
self._alpha_rot = alpha_rot
self._zero_out_xy_rotation = cfg.zero_out_xy_rotation
self._use_wrist_rotation = cfg.use_wrist_rotation
self._use_wrist_position = cfg.use_wrist_position
self._delta_pos_scale_factor = cfg.delta_pos_scale_factor
self._delta_rot_scale_factor = cfg.delta_rot_scale_factor
self._alpha_pos = cfg.alpha_pos
self._alpha_rot = cfg.alpha_rot
# Initialize smoothing state
self._smoothed_delta_pos = np.zeros(3)
......@@ -75,8 +86,8 @@ class Se3RelRetargeter(RetargeterBase):
self._rotation_threshold = 0.01
# Initialize visualization if enabled
self._enable_visualization = enable_visualization
if enable_visualization:
self._enable_visualization = cfg.enable_visualization
if cfg.enable_visualization:
frame_marker_cfg = FRAME_MARKER_CFG.copy()
frame_marker_cfg.markers["frame"].scale = (0.1, 0.1, 0.1)
self._goal_marker = VisualizationMarkers(frame_marker_cfg.replace(prim_path="/Visuals/ee_goal"))
......@@ -88,7 +99,7 @@ class Se3RelRetargeter(RetargeterBase):
self._previous_index_tip = np.array([0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0], dtype=np.float32)
self._previous_wrist = np.array([0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0], dtype=np.float32)
def retarget(self, data: dict) -> np.ndarray:
def retarget(self, data: dict) -> torch.Tensor:
"""Convert hand joint poses to robot end-effector command.
Args:
......@@ -96,7 +107,7 @@ class Se3RelRetargeter(RetargeterBase):
The joint names are defined in isaaclab.devices.openxr.common.HAND_JOINT_NAMES
Returns:
np.ndarray: 6D array containing position (xyz) and rotation vector (rx,ry,rz)
torch.Tensor: 6D tensor containing position (xyz) and rotation vector (rx,ry,rz)
for the robot end-effector
"""
# Extract key joint poses from the bound hand
......@@ -108,12 +119,15 @@ class Se3RelRetargeter(RetargeterBase):
delta_thumb_tip = self._calculate_delta_pose(thumb_tip, self._previous_thumb_tip)
delta_index_tip = self._calculate_delta_pose(index_tip, self._previous_index_tip)
delta_wrist = self._calculate_delta_pose(wrist, self._previous_wrist)
ee_command = self._retarget_rel(delta_thumb_tip, delta_index_tip, delta_wrist)
ee_command_np = self._retarget_rel(delta_thumb_tip, delta_index_tip, delta_wrist)
self._previous_thumb_tip = thumb_tip.copy()
self._previous_index_tip = index_tip.copy()
self._previous_wrist = wrist.copy()
# Convert to torch tensor
ee_command = torch.tensor(ee_command_np, dtype=torch.float32, device=self._sim_device)
return ee_command
def _calculate_delta_pose(self, joint_pose: np.ndarray, previous_joint_pose: np.ndarray) -> np.ndarray:
......
......@@ -4,9 +4,17 @@
# SPDX-License-Identifier: BSD-3-Clause
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any
@dataclass
class RetargeterCfg:
"""Base configuration for hand tracking retargeters."""
sim_device: str = "cpu"
class RetargeterBase(ABC):
"""Base interface for input data retargeting.
......@@ -18,6 +26,14 @@ class RetargeterBase(ABC):
- Sensor data to control signals
"""
def __init__(self, cfg: RetargeterCfg):
"""Initialize the retargeter.
Args:
cfg: Configuration for the retargeter
"""
self._sim_device = cfg.sim_device
@abstractmethod
def retarget(self, data: Any) -> Any:
"""Retarget input data to desired output format.
......
......@@ -5,5 +5,5 @@
"""Spacemouse device for SE(2) and SE(3) control."""
from .se2_spacemouse import Se2SpaceMouse
from .se3_spacemouse import Se3SpaceMouse
from .se2_spacemouse import Se2SpaceMouse, Se2SpaceMouseCfg
from .se3_spacemouse import Se3SpaceMouse, Se3SpaceMouseCfg
......@@ -9,12 +9,26 @@ import hid
import numpy as np
import threading
import time
import torch
from collections.abc import Callable
from dataclasses import dataclass
from ..device_base import DeviceBase
from isaaclab.utils.array import convert_to_torch
from ..device_base import DeviceBase, DeviceCfg
from .utils import convert_buffer
@dataclass
class Se2SpaceMouseCfg(DeviceCfg):
"""Configuration for SE2 space mouse devices."""
v_x_sensitivity: float = 0.8
v_y_sensitivity: float = 0.4
omega_z_sensitivity: float = 1.0
sim_device: str = "cpu"
class Se2SpaceMouse(DeviceBase):
r"""A space-mouse controller for sending SE(2) commands as delta poses.
......@@ -34,18 +48,17 @@ class Se2SpaceMouse(DeviceBase):
"""
def __init__(self, v_x_sensitivity: float = 0.8, v_y_sensitivity: float = 0.4, omega_z_sensitivity: float = 1.0):
def __init__(self, cfg: Se2SpaceMouseCfg):
"""Initialize the spacemouse layer.
Args:
v_x_sensitivity: Magnitude of linear velocity along x-direction scaling. Defaults to 0.8.
v_y_sensitivity: Magnitude of linear velocity along y-direction scaling. Defaults to 0.4.
omega_z_sensitivity: Magnitude of angular velocity along z-direction scaling. Defaults to 1.0.
cfg: Configuration for the spacemouse device.
"""
# store inputs
self.v_x_sensitivity = v_x_sensitivity
self.v_y_sensitivity = v_y_sensitivity
self.omega_z_sensitivity = omega_z_sensitivity
self.v_x_sensitivity = cfg.v_x_sensitivity
self.v_y_sensitivity = cfg.v_y_sensitivity
self.omega_z_sensitivity = cfg.omega_z_sensitivity
self._sim_device = cfg.sim_device
# acquire device interface
self._device = hid.device()
self._find_device()
......@@ -88,13 +101,13 @@ class Se2SpaceMouse(DeviceBase):
# TODO: Improve this to allow multiple buttons on same key.
self._additional_callbacks[key] = func
def advance(self) -> np.ndarray:
def advance(self) -> torch.Tensor:
"""Provides the result from spacemouse event state.
Returns:
A 3D array containing the linear (x,y) and angular velocity (z).
A 3D tensor containing the linear (x,y) and angular velocity (z).
"""
return self._base_command
return convert_to_torch(self._base_command, device=self._sim_device)
"""
Internal helpers.
......
......@@ -9,13 +9,24 @@ import hid
import numpy as np
import threading
import time
import torch
from collections.abc import Callable
from dataclasses import dataclass
from scipy.spatial.transform import Rotation
from ..device_base import DeviceBase
from ..device_base import DeviceBase, DeviceCfg
from .utils import convert_buffer
@dataclass
class Se3SpaceMouseCfg(DeviceCfg):
"""Configuration for SE3 space mouse devices."""
pos_sensitivity: float = 0.4
rot_sensitivity: float = 0.8
retargeters: None = None
class Se3SpaceMouse(DeviceBase):
"""A space-mouse controller for sending SE(3) commands as delta poses.
......@@ -38,16 +49,16 @@ class Se3SpaceMouse(DeviceBase):
"""
def __init__(self, pos_sensitivity: float = 0.4, rot_sensitivity: float = 0.8):
def __init__(self, cfg: Se3SpaceMouseCfg):
"""Initialize the space-mouse layer.
Args:
pos_sensitivity: Magnitude of input position command scaling. Defaults to 0.4.
rot_sensitivity: Magnitude of scale input rotation commands scaling. Defaults to 0.8.
cfg: Configuration object for space-mouse settings.
"""
# store inputs
self.pos_sensitivity = pos_sensitivity
self.rot_sensitivity = rot_sensitivity
self.pos_sensitivity = cfg.pos_sensitivity
self.rot_sensitivity = cfg.rot_sensitivity
self._sim_device = cfg.sim_device
# acquire device interface
self._device = hid.device()
self._find_device()
......@@ -99,15 +110,19 @@ class Se3SpaceMouse(DeviceBase):
# TODO: Improve this to allow multiple buttons on same key.
self._additional_callbacks[key] = func
def advance(self) -> tuple[np.ndarray, bool]:
def advance(self) -> torch.Tensor:
"""Provides the result from spacemouse event state.
Returns:
A tuple containing the delta pose command and gripper commands.
torch.Tensor: A 7-element tensor containing:
- delta pose: First 6 elements as [x, y, z, rx, ry, rz] in meters and radians.
- gripper command: Last element as a binary value (+1.0 for open, -1.0 for close).
"""
rot_vec = Rotation.from_euler("XYZ", self._delta_rot).as_rotvec()
# if new command received, reset event flag to False until keyboard updated.
return np.concatenate([self._delta_pos, rot_vec]), self._close_gripper
delta_pose = np.concatenate([self._delta_pos, rot_vec])
gripper_value = -1.0 if self._close_gripper else 1.0
command = np.append(delta_pose, gripper_value)
return torch.tensor(command, dtype=torch.float32, device=self._sim_device)
"""
Internal helpers.
......
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Factory to create teleoperation devices from configuration."""
import contextlib
import inspect
from collections.abc import Callable
import omni.log
from isaaclab.devices import DeviceBase, DeviceCfg
from isaaclab.devices.gamepad import Se2Gamepad, Se2GamepadCfg, Se3Gamepad, Se3GamepadCfg
from isaaclab.devices.keyboard import Se2Keyboard, Se2KeyboardCfg, Se3Keyboard, Se3KeyboardCfg
from isaaclab.devices.openxr.retargeters import (
GR1T2Retargeter,
GR1T2RetargeterCfg,
GripperRetargeter,
GripperRetargeterCfg,
Se3AbsRetargeter,
Se3AbsRetargeterCfg,
Se3RelRetargeter,
Se3RelRetargeterCfg,
)
from isaaclab.devices.retargeter_base import RetargeterBase, RetargeterCfg
from isaaclab.devices.spacemouse import Se2SpaceMouse, Se2SpaceMouseCfg, Se3SpaceMouse, Se3SpaceMouseCfg
with contextlib.suppress(ModuleNotFoundError):
# May fail if xr is not in use
from isaaclab.devices.openxr import OpenXRDevice, OpenXRDeviceCfg
# Map device types to their constructor and expected config type
DEVICE_MAP: dict[type[DeviceCfg], type[DeviceBase]] = {
Se3KeyboardCfg: Se3Keyboard,
Se3SpaceMouseCfg: Se3SpaceMouse,
Se3GamepadCfg: Se3Gamepad,
Se2KeyboardCfg: Se2Keyboard,
Se2GamepadCfg: Se2Gamepad,
Se2SpaceMouseCfg: Se2SpaceMouse,
OpenXRDeviceCfg: OpenXRDevice,
}
# Map configuration types to their corresponding retargeter classes
RETARGETER_MAP: dict[type[RetargeterCfg], type[RetargeterBase]] = {
Se3AbsRetargeterCfg: Se3AbsRetargeter,
Se3RelRetargeterCfg: Se3RelRetargeter,
GripperRetargeterCfg: GripperRetargeter,
GR1T2RetargeterCfg: GR1T2Retargeter,
}
def create_teleop_device(
device_name: str, devices_cfg: dict[str, DeviceCfg], callbacks: dict[str, Callable] | None = None
) -> DeviceBase:
"""Create a teleoperation device based on configuration.
Args:
device_name: The name of the device to create (must exist in devices_cfg)
devices_cfg: Dictionary of device configurations
callbacks: Optional dictionary of callbacks to register with the device
Keys are the button/gesture names, values are callback functions
Returns:
The configured teleoperation device
Raises:
ValueError: If the device name is not found in the configuration
ValueError: If the device configuration type is not supported
"""
if device_name not in devices_cfg:
raise ValueError(f"Device '{device_name}' not found in teleop device configurations")
device_cfg = devices_cfg[device_name]
callbacks = callbacks or {}
# Check if device config type is supported
cfg_type = type(device_cfg)
if cfg_type not in DEVICE_MAP:
raise ValueError(f"Unsupported device configuration type: {cfg_type.__name__}")
# Get the constructor for this config type
constructor = DEVICE_MAP[cfg_type]
# Try to create retargeters if they are configured
retargeters = []
if hasattr(device_cfg, "retargeters") and device_cfg.retargeters is not None:
try:
# Create retargeters based on configuration
for retargeter_cfg in device_cfg.retargeters:
cfg_type = type(retargeter_cfg)
if cfg_type in RETARGETER_MAP:
retargeters.append(RETARGETER_MAP[cfg_type](retargeter_cfg))
else:
raise ValueError(f"Unknown retargeter configuration type: {cfg_type.__name__}")
except NameError as e:
raise ValueError(f"Failed to create retargeters: {e}")
# Check if the constructor accepts retargeters parameter
constructor_params = inspect.signature(constructor).parameters
if "retargeters" in constructor_params and retargeters:
device = constructor(cfg=device_cfg, retargeters=retargeters)
else:
device = constructor(cfg=device_cfg)
# Register callbacks
for key, callback in callbacks.items():
device.add_callback(key, callback)
omni.log.info(f"Created teleoperation device: {device_name}")
return device
......@@ -9,9 +9,10 @@ This module defines the general configuration of the environment. It includes pa
configuring the environment instances, viewer settings, and simulation parameters.
"""
from dataclasses import MISSING
from dataclasses import MISSING, field
import isaaclab.envs.mdp as mdp
from isaaclab.devices.device_base import DevicesCfg
from isaaclab.devices.openxr import XrCfg
from isaaclab.managers import EventTermCfg as EventTerm
from isaaclab.managers import RecorderManagerBaseCfg as DefaultEmptyRecorderManagerCfg
......@@ -121,3 +122,6 @@ class ManagerBasedEnvCfg:
xr: XrCfg | None = None
"""Configuration for viewing and interacting with the environment through an XR device."""
teleop_devices: DevicesCfg = field(default_factory=DevicesCfg)
"""Configuration for teleoperation devices."""
......@@ -25,7 +25,7 @@ import ctypes
from isaacsim.core.api.simulation_context import SimulationContext
from isaaclab.devices import Se3Keyboard
from isaaclab.devices import Se3Keyboard, Se3KeyboardCfg
def print_cb():
......@@ -44,7 +44,7 @@ def main():
sim = SimulationContext(physics_dt=0.01, rendering_dt=0.01)
# Create teleoperation interface
teleop_interface = Se3Keyboard(pos_sensitivity=0.1, rot_sensitivity=0.1)
teleop_interface = Se3Keyboard(Se3KeyboardCfg(pos_sensitivity=0.1, rot_sensitivity=0.1))
# Add teleoperation callbacks
# available key buttons: https://docs.omniverse.nvidia.com/kit/docs/carbonite/latest/docs/python/carb.html?highlight=keyboardeventtype#carb.input.KeyboardInput
teleop_interface.add_callback("L", print_cb)
......
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Launch Isaac Sim Simulator first."""
from isaaclab.app import AppLauncher, run_tests
# launch omniverse app
simulation_app = AppLauncher(headless=True).app
"""Rest everything follows."""
import torch
import unittest
from unittest.mock import MagicMock, patch
# Import device classes to test
from isaaclab.devices import (
OpenXRDevice,
OpenXRDeviceCfg,
Se2Gamepad,
Se2GamepadCfg,
Se2Keyboard,
Se2KeyboardCfg,
Se2SpaceMouse,
Se2SpaceMouseCfg,
Se3Gamepad,
Se3GamepadCfg,
Se3Keyboard,
Se3KeyboardCfg,
Se3SpaceMouse,
Se3SpaceMouseCfg,
)
from isaaclab.devices.openxr import XrCfg
from isaaclab.devices.openxr.retargeters import GripperRetargeterCfg, Se3AbsRetargeterCfg
# Import teleop device factory for testing
from isaaclab.devices.teleop_device_factory import create_teleop_device
class TestDeviceConstructors(unittest.TestCase):
"""Test fixture for device classes constructors and basic functionality."""
def setUp(self):
"""Set up tests."""
# Create mock objects that will be used across tests
self.carb_mock = MagicMock()
self.omni_mock = MagicMock()
self.appwindow_mock = MagicMock()
self.keyboard_mock = MagicMock()
self.gamepad_mock = MagicMock()
self.input_mock = MagicMock()
self.settings_mock = MagicMock()
self.hid_mock = MagicMock()
self.device_mock = MagicMock()
# Set up the mocks to return appropriate objects
self.omni_mock.appwindow.get_default_app_window.return_value = self.appwindow_mock
self.appwindow_mock.get_keyboard.return_value = self.keyboard_mock
self.appwindow_mock.get_gamepad.return_value = self.gamepad_mock
self.carb_mock.input.acquire_input_interface.return_value = self.input_mock
self.carb_mock.settings.get_settings.return_value = self.settings_mock
# Mock keyboard event types
self.carb_mock.input.KeyboardEventType.KEY_PRESS = 1
self.carb_mock.input.KeyboardEventType.KEY_RELEASE = 2
# Mock the SpaceMouse
self.hid_mock.enumerate.return_value = [
{"product_string": "SpaceMouse Compact", "vendor_id": 123, "product_id": 456}
]
self.hid_mock.device.return_value = self.device_mock
# Mock OpenXR
self.xr_core_mock = MagicMock()
self.message_bus_mock = MagicMock()
self.singleton_mock = MagicMock()
self.omni_mock.kit.xr.core.XRCore.get_singleton.return_value = self.singleton_mock
self.singleton_mock.get_message_bus.return_value = self.message_bus_mock
self.omni_mock.kit.xr.core.XRPoseValidityFlags.POSITION_VALID = 1
self.omni_mock.kit.xr.core.XRPoseValidityFlags.ORIENTATION_VALID = 2
def tearDown(self):
"""Clean up after tests."""
# Clean up mock objects if needed
pass
"""
Test keyboard devices.
"""
@patch.dict("sys.modules", {"carb": MagicMock(), "omni": MagicMock()})
def test_se2keyboard_constructors(self):
"""Test constructor for Se2Keyboard."""
# Test config-based constructor
config = Se2KeyboardCfg(
v_x_sensitivity=0.9,
v_y_sensitivity=0.5,
omega_z_sensitivity=1.2,
)
with patch("isaaclab.devices.keyboard.se2_keyboard.carb", self.carb_mock):
with patch("isaaclab.devices.keyboard.se2_keyboard.omni", self.omni_mock):
keyboard = Se2Keyboard(config)
# Verify configuration was applied correctly
self.assertEqual(keyboard.v_x_sensitivity, 0.9)
self.assertEqual(keyboard.v_y_sensitivity, 0.5)
self.assertEqual(keyboard.omega_z_sensitivity, 1.2)
# Test advance() returns expected type
result = keyboard.advance()
self.assertIsInstance(result, torch.Tensor)
self.assertEqual(result.shape, (3,)) # (v_x, v_y, omega_z)
@patch.dict("sys.modules", {"carb": MagicMock(), "omni": MagicMock()})
def test_se3keyboard_constructors(self):
"""Test constructor for Se3Keyboard."""
# Test config-based constructor
config = Se3KeyboardCfg(
pos_sensitivity=0.5,
rot_sensitivity=0.9,
)
with patch("isaaclab.devices.keyboard.se3_keyboard.carb", self.carb_mock):
with patch("isaaclab.devices.keyboard.se3_keyboard.omni", self.omni_mock):
keyboard = Se3Keyboard(config)
# Verify configuration was applied correctly
self.assertEqual(keyboard.pos_sensitivity, 0.5)
self.assertEqual(keyboard.rot_sensitivity, 0.9)
# Test advance() returns expected type
result = keyboard.advance()
self.assertIsInstance(result, torch.Tensor)
self.assertEqual(result.shape, (7,)) # (pos_x, pos_y, pos_z, rot_x, rot_y, rot_z, gripper)
"""
Test gamepad devices.
"""
@patch.dict("sys.modules", {"carb": MagicMock(), "omni": MagicMock()})
def test_se2gamepad_constructors(self):
"""Test constructor for Se2Gamepad."""
# Test config-based constructor
config = Se2GamepadCfg(
v_x_sensitivity=1.1,
v_y_sensitivity=0.6,
omega_z_sensitivity=1.2,
dead_zone=0.02,
)
with patch("isaaclab.devices.gamepad.se2_gamepad.carb", self.carb_mock):
with patch("isaaclab.devices.gamepad.se2_gamepad.omni", self.omni_mock):
gamepad = Se2Gamepad(config)
# Verify configuration was applied correctly
self.assertEqual(gamepad.v_x_sensitivity, 1.1)
self.assertEqual(gamepad.v_y_sensitivity, 0.6)
self.assertEqual(gamepad.omega_z_sensitivity, 1.2)
self.assertEqual(gamepad.dead_zone, 0.02)
# Test advance() returns expected type
result = gamepad.advance()
self.assertIsInstance(result, torch.Tensor)
self.assertEqual(result.shape, (3,)) # (v_x, v_y, omega_z)
@patch.dict("sys.modules", {"carb": MagicMock(), "omni": MagicMock()})
def test_se3gamepad_constructors(self):
"""Test constructor for Se3Gamepad."""
# Test config-based constructor
config = Se3GamepadCfg(
pos_sensitivity=1.1,
rot_sensitivity=1.7,
dead_zone=0.02,
)
with patch("isaaclab.devices.gamepad.se3_gamepad.carb", self.carb_mock):
with patch("isaaclab.devices.gamepad.se3_gamepad.omni", self.omni_mock):
gamepad = Se3Gamepad(config)
# Verify configuration was applied correctly
self.assertEqual(gamepad.pos_sensitivity, 1.1)
self.assertEqual(gamepad.rot_sensitivity, 1.7)
self.assertEqual(gamepad.dead_zone, 0.02)
# Test advance() returns expected type
result = gamepad.advance()
self.assertIsInstance(result, torch.Tensor)
self.assertEqual(result.shape, (7,)) # (pos_x, pos_y, pos_z, rot_x, rot_y, rot_z, gripper)
"""
Test spacemouse devices.
"""
@patch.dict("sys.modules", {"hid": MagicMock()})
def test_se2spacemouse_constructors(self):
"""Test constructor for Se2SpaceMouse."""
# Test config-based constructor
config = Se2SpaceMouseCfg(
v_x_sensitivity=0.9,
v_y_sensitivity=0.5,
omega_z_sensitivity=1.2,
)
with patch("isaaclab.devices.spacemouse.se2_spacemouse.hid", self.hid_mock):
spacemouse = Se2SpaceMouse(config)
# Verify configuration was applied correctly
self.assertEqual(spacemouse.v_x_sensitivity, 0.9)
self.assertEqual(spacemouse.v_y_sensitivity, 0.5)
self.assertEqual(spacemouse.omega_z_sensitivity, 1.2)
# Test advance() returns expected type
self.device_mock.read.return_value = [1, 0, 0, 0, 0]
result = spacemouse.advance()
self.assertIsInstance(result, torch.Tensor)
self.assertEqual(result.shape, (3,)) # (v_x, v_y, omega_z)
@patch.dict("sys.modules", {"hid": MagicMock()})
def test_se3spacemouse_constructors(self):
"""Test constructor for Se3SpaceMouse."""
# Test config-based constructor
config = Se3SpaceMouseCfg(
pos_sensitivity=0.5,
rot_sensitivity=0.9,
)
with patch("isaaclab.devices.spacemouse.se3_spacemouse.hid", self.hid_mock):
spacemouse = Se3SpaceMouse(config)
# Verify configuration was applied correctly
self.assertEqual(spacemouse.pos_sensitivity, 0.5)
self.assertEqual(spacemouse.rot_sensitivity, 0.9)
# Test advance() returns expected type
self.device_mock.read.return_value = [1, 0, 0, 0, 0, 0, 0]
result = spacemouse.advance()
self.assertIsInstance(result, torch.Tensor)
self.assertEqual(result.shape, (7,)) # (pos_x, pos_y, pos_z, rot_x, rot_y, rot_z, gripper)
"""
Test OpenXR devices.
"""
def test_openxr_constructors(self):
"""Test constructor for OpenXRDevice."""
# Test config-based constructor with custom XrCfg
xr_cfg = XrCfg(
anchor_pos=(1.0, 2.0, 3.0),
anchor_rot=(0.0, 0.1, 0.2), # Using 3-tuple for rotation based on type hint
near_plane=0.2,
)
config = OpenXRDeviceCfg(xr_cfg=xr_cfg)
# Create mock retargeters
mock_controller_retargeter = MagicMock()
mock_head_retargeter = MagicMock()
retargeters = [mock_controller_retargeter, mock_head_retargeter]
with patch.dict(
"sys.modules",
{
"carb": self.carb_mock,
"omni.kit.xr.core": self.omni_mock.kit.xr.core,
"isaacsim.core.prims": MagicMock(),
},
):
with patch("isaaclab.devices.openxr.openxr_device.XRCore", self.omni_mock.kit.xr.core.XRCore):
with patch(
"isaaclab.devices.openxr.openxr_device.XRPoseValidityFlags",
self.omni_mock.kit.xr.core.XRPoseValidityFlags,
):
with patch("isaaclab.devices.openxr.openxr_device.SingleXFormPrim") as mock_single_xform:
# Configure the mock to return a string for prim_path
mock_instance = mock_single_xform.return_value
mock_instance.prim_path = "/XRAnchor"
# Create the device using the factory
device = OpenXRDevice(config)
# Verify the device was created successfully
self.assertEqual(device._xr_cfg, xr_cfg)
# Test with retargeters
device = OpenXRDevice(cfg=config, retargeters=retargeters)
# Verify retargeters were correctly assigned as a list
self.assertEqual(device._retargeters, retargeters)
# Test with config and retargeters
device = OpenXRDevice(cfg=config, retargeters=retargeters)
# Verify both config and retargeters were correctly assigned
self.assertEqual(device._xr_cfg, xr_cfg)
self.assertEqual(device._retargeters, retargeters)
# Test reset functionality
device.reset()
"""
Test teleop device factory.
"""
@patch.dict("sys.modules", {"carb": MagicMock(), "omni": MagicMock()})
def test_create_teleop_device_basic(self):
"""Test creating devices using the teleop device factory."""
# Create device configuration
keyboard_cfg = Se3KeyboardCfg(pos_sensitivity=0.8, rot_sensitivity=1.2)
# Create devices configuration dictionary
devices_cfg = {"test_keyboard": keyboard_cfg}
# Mock Se3Keyboard class
with patch("isaaclab.devices.keyboard.se3_keyboard.carb", self.carb_mock):
with patch("isaaclab.devices.keyboard.se3_keyboard.omni", self.omni_mock):
# Create the device using the factory
device = create_teleop_device("test_keyboard", devices_cfg)
# Verify the device was created correctly
self.assertIsInstance(device, Se3Keyboard)
self.assertEqual(device.pos_sensitivity, 0.8)
self.assertEqual(device.rot_sensitivity, 1.2)
@patch.dict("sys.modules", {"carb": MagicMock(), "omni": MagicMock()})
def test_create_teleop_device_with_callbacks(self):
"""Test creating device with callbacks."""
# Create device configuration
xr_cfg = XrCfg(anchor_pos=(0.0, 0.0, 0.0), anchor_rot=(1.0, 0.0, 0.0, 0.0), near_plane=0.15)
openxr_cfg = OpenXRDeviceCfg(xr_cfg=xr_cfg)
# Create devices configuration dictionary
devices_cfg = {"test_xr": openxr_cfg}
# Create mock callbacks
button_a_callback = MagicMock()
button_b_callback = MagicMock()
callbacks = {"button_a": button_a_callback, "button_b": button_b_callback}
# Mock OpenXRDevice class and dependencies
with patch.dict(
"sys.modules",
{
"carb": self.carb_mock,
"omni.kit.xr.core": self.omni_mock.kit.xr.core,
"isaacsim.core.prims": MagicMock(),
},
):
with patch("isaaclab.devices.openxr.openxr_device.XRCore", self.omni_mock.kit.xr.core.XRCore):
with patch(
"isaaclab.devices.openxr.openxr_device.XRPoseValidityFlags",
self.omni_mock.kit.xr.core.XRPoseValidityFlags,
):
with patch("isaaclab.devices.openxr.openxr_device.SingleXFormPrim") as mock_single_xform:
# Configure the mock to return a string for prim_path
mock_instance = mock_single_xform.return_value
mock_instance.prim_path = "/XRAnchor"
# Create the device using the factory
device = create_teleop_device("test_xr", devices_cfg, callbacks)
# Verify the device was created correctly
self.assertIsInstance(device, OpenXRDevice)
# Verify callbacks were registered
device.add_callback("button_a", button_a_callback)
device.add_callback("button_b", button_b_callback)
self.assertEqual(len(device._additional_callbacks), 2)
@patch.dict("sys.modules", {"carb": MagicMock(), "omni": MagicMock()})
def test_create_teleop_device_with_retargeters(self):
"""Test creating device with retargeters."""
# Create retargeter configurations
retargeter_cfg1 = Se3AbsRetargeterCfg()
retargeter_cfg2 = GripperRetargeterCfg()
# Create device configuration with retargeters
xr_cfg = XrCfg()
device_cfg = OpenXRDeviceCfg(xr_cfg=xr_cfg, retargeters=[retargeter_cfg1, retargeter_cfg2])
# Create devices configuration dictionary
devices_cfg = {"test_xr": device_cfg}
# Mock OpenXRDevice class and dependencies
with patch.dict(
"sys.modules",
{
"carb": self.carb_mock,
"omni.kit.xr.core": self.omni_mock.kit.xr.core,
"isaacsim.core.prims": MagicMock(),
},
):
with patch("isaaclab.devices.openxr.openxr_device.XRCore", self.omni_mock.kit.xr.core.XRCore):
with patch(
"isaaclab.devices.openxr.openxr_device.XRPoseValidityFlags",
self.omni_mock.kit.xr.core.XRPoseValidityFlags,
):
with patch("isaaclab.devices.openxr.openxr_device.SingleXFormPrim") as mock_single_xform:
# Mock retargeter classes
with patch("isaaclab.devices.openxr.retargeters.Se3AbsRetargeter"):
with patch("isaaclab.devices.openxr.retargeters.GripperRetargeter"):
# Configure the mock to return a string for prim_path
mock_instance = mock_single_xform.return_value
mock_instance.prim_path = "/XRAnchor"
# Create the device using the factory
device = create_teleop_device("test_xr", devices_cfg)
# Verify retargeters were created
self.assertEqual(len(device._retargeters), 2)
def test_create_teleop_device_device_not_found(self):
"""Test error when device name is not found in configuration."""
# Create devices configuration dictionary
devices_cfg = {"keyboard": Se3KeyboardCfg()}
# Try to create a non-existent device
with self.assertRaises(ValueError) as context:
create_teleop_device("gamepad", devices_cfg)
# Verify the error message
self.assertIn("Device 'gamepad' not found", str(context.exception))
def test_create_teleop_device_unsupported_config(self):
"""Test error when device configuration type is not supported."""
# Create a custom unsupported configuration class
class UnsupportedCfg:
pass
# Create devices configuration dictionary with unsupported config
devices_cfg = {"unsupported": UnsupportedCfg()}
# Try to create a device with unsupported configuration
with self.assertRaises(ValueError) as context:
create_teleop_device("unsupported", devices_cfg)
# Verify the error message
self.assertIn("Unsupported device configuration type", str(context.exception))
if __name__ == "__main__":
run_tests()
......@@ -7,6 +7,9 @@ from pink.tasks import FrameTask
import isaaclab.controllers.utils as ControllerUtils
from isaaclab.controllers.pink_ik_cfg import PinkIKControllerCfg
from isaaclab.devices import DevicesCfg
from isaaclab.devices.openxr import OpenXRDeviceCfg
from isaaclab.devices.openxr.retargeters import GR1T2RetargeterCfg
from isaaclab.envs.mdp.actions.pink_actions_cfg import PinkInverseKinematicsActionCfg
from isaaclab.utils import configclass
......@@ -152,3 +155,21 @@ class ExhaustPipeGR1T2PinkIKEnvCfg(ExhaustPipeGR1T2BaseEnvCfg):
# Set the URDF and mesh paths for the IK controller
self.actions.gr1_action.controller.urdf_path = temp_urdf_output_path
self.actions.gr1_action.controller.mesh_path = temp_urdf_meshes_output_path
self.teleop_devices = DevicesCfg(
devices={
"handtracking": OpenXRDeviceCfg(
retargeters=[
GR1T2RetargeterCfg(
enable_visualization=True,
# OpenXR hand tracking has 26 joints per hand
num_open_xr_hand_joints=2 * 26,
sim_device=self.sim.device,
hand_joint_names=self.actions.gr1_action.hand_joint_names,
),
],
sim_device=self.sim.device,
xr_cfg=self.xr,
),
}
)
......@@ -7,6 +7,9 @@ from pink.tasks import FrameTask
import isaaclab.controllers.utils as ControllerUtils
from isaaclab.controllers.pink_ik_cfg import PinkIKControllerCfg
from isaaclab.devices import DevicesCfg
from isaaclab.devices.openxr import OpenXRDeviceCfg
from isaaclab.devices.openxr.retargeters import GR1T2RetargeterCfg
from isaaclab.envs.mdp.actions.pink_actions_cfg import PinkInverseKinematicsActionCfg
from isaaclab.utils import configclass
......@@ -150,3 +153,21 @@ class NutPourGR1T2PinkIKEnvCfg(NutPourGR1T2BaseEnvCfg):
# Set the URDF and mesh paths for the IK controller
self.actions.gr1_action.controller.urdf_path = temp_urdf_output_path
self.actions.gr1_action.controller.mesh_path = temp_urdf_meshes_output_path
self.teleop_devices = DevicesCfg(
devices={
"handtracking": OpenXRDeviceCfg(
retargeters=[
GR1T2RetargeterCfg(
enable_visualization=True,
# OpenXR hand tracking has 26 joints per hand
num_open_xr_hand_joints=2 * 26,
sim_device=self.sim.device,
hand_joint_names=self.actions.gr1_action.hand_joint_names,
),
],
sim_device=self.sim.device,
xr_cfg=self.xr,
),
}
)
......@@ -18,7 +18,9 @@ import isaaclab.envs.mdp as base_mdp
import isaaclab.sim as sim_utils
from isaaclab.assets import ArticulationCfg, AssetBaseCfg, RigidObjectCfg
from isaaclab.controllers.pink_ik_cfg import PinkIKControllerCfg
from isaaclab.devices.openxr import XrCfg
from isaaclab.devices.device_base import DevicesCfg
from isaaclab.devices.openxr import OpenXRDeviceCfg, XrCfg
from isaaclab.devices.openxr.retargeters.humanoid.fourier.gr1t2_retargeter import GR1T2RetargeterCfg
from isaaclab.envs import ManagerBasedRLEnvCfg
from isaaclab.envs.mdp.actions.pink_actions_cfg import PinkInverseKinematicsActionCfg
from isaaclab.managers import EventTermCfg as EventTerm
......@@ -410,3 +412,21 @@ class PickPlaceGR1T2EnvCfg(ManagerBasedRLEnvCfg):
# Set the URDF and mesh paths for the IK controller
self.actions.pink_ik_cfg.controller.urdf_path = temp_urdf_output_path
self.actions.pink_ik_cfg.controller.mesh_path = temp_urdf_meshes_output_path
self.teleop_devices = DevicesCfg(
devices={
"handtracking": OpenXRDeviceCfg(
retargeters=[
GR1T2RetargeterCfg(
enable_visualization=True,
# OpenXR hand tracking has 26 joints per hand
num_open_xr_hand_joints=2 * 26,
sim_device=self.sim.device,
hand_joint_names=self.actions.pink_ik_cfg.hand_joint_names,
),
],
sim_device=self.sim.device,
xr_cfg=self.xr,
),
}
)
......@@ -4,6 +4,10 @@
# SPDX-License-Identifier: BSD-3-Clause
from isaaclab.controllers.differential_ik_cfg import DifferentialIKControllerCfg
from isaaclab.devices.device_base import DevicesCfg
from isaaclab.devices.openxr.openxr_device import OpenXRDevice, OpenXRDeviceCfg
from isaaclab.devices.openxr.retargeters.manipulator.gripper_retargeter import GripperRetargeterCfg
from isaaclab.devices.openxr.retargeters.manipulator.se3_abs_retargeter import Se3AbsRetargeterCfg
from isaaclab.envs.mdp.actions.actions_cfg import DifferentialInverseKinematicsActionCfg
from isaaclab.utils import configclass
......@@ -32,3 +36,24 @@ class FrankaCubeStackEnvCfg(stack_joint_pos_env_cfg.FrankaCubeStackEnvCfg):
body_name="panda_hand",
controller=DifferentialIKControllerCfg(command_type="pose", use_relative_mode=False, ik_method="dls"),
)
self.teleop_devices = DevicesCfg(
devices={
"handtracking": OpenXRDeviceCfg(
retargeters=[
Se3AbsRetargeterCfg(
bound_hand=OpenXRDevice.TrackingTarget.HAND_RIGHT,
zero_out_xy_rotation=True,
use_wrist_rotation=False,
use_wrist_position=True,
sim_device=self.sim.device,
),
GripperRetargeterCfg(
bound_hand=OpenXRDevice.TrackingTarget.HAND_RIGHT, sim_device=self.sim.device
),
],
sim_device=self.sim.device,
xr_cfg=self.xr,
),
}
)
......@@ -4,6 +4,11 @@
# SPDX-License-Identifier: BSD-3-Clause
from isaaclab.controllers.differential_ik_cfg import DifferentialIKControllerCfg
from isaaclab.devices.device_base import DevicesCfg
from isaaclab.devices.keyboard import Se3KeyboardCfg
from isaaclab.devices.openxr.openxr_device import OpenXRDevice, OpenXRDeviceCfg
from isaaclab.devices.openxr.retargeters.manipulator.gripper_retargeter import GripperRetargeterCfg
from isaaclab.devices.openxr.retargeters.manipulator.se3_rel_retargeter import Se3RelRetargeterCfg
from isaaclab.envs.mdp.actions.actions_cfg import DifferentialInverseKinematicsActionCfg
from isaaclab.utils import configclass
......@@ -34,3 +39,31 @@ class FrankaCubeStackEnvCfg(stack_joint_pos_env_cfg.FrankaCubeStackEnvCfg):
scale=0.5,
body_offset=DifferentialInverseKinematicsActionCfg.OffsetCfg(pos=[0.0, 0.0, 0.107]),
)
self.teleop_devices = DevicesCfg(
devices={
"handtracking": OpenXRDeviceCfg(
retargeters=[
Se3RelRetargeterCfg(
bound_hand=OpenXRDevice.TrackingTarget.HAND_RIGHT,
zero_out_xy_rotation=True,
use_wrist_rotation=False,
use_wrist_position=True,
delta_pos_scale_factor=10.0,
delta_rot_scale_factor=10.0,
sim_device=self.sim.device,
),
GripperRetargeterCfg(
bound_hand=OpenXRDevice.TrackingTarget.HAND_RIGHT, sim_device=self.sim.device
),
],
sim_device=self.sim.device,
xr_cfg=self.xr,
),
"keyboard": Se3KeyboardCfg(
pos_sensitivity=0.05,
rot_sensitivity=0.05,
sim_device=self.sim.device,
),
}
)
......@@ -33,6 +33,7 @@ PER_TEST_TIMEOUTS = {
"test_sb3_wrapper.py": 200,
"test_skrl_wrapper.py": 200,
"test_operational_space.py": 300,
"test_device_constructors.py": 200,
"test_terrain_importer.py": 200,
"test_environments_training.py": 5000,
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment