Unverified Commit e4a23f3d authored by mingxueg's avatar mingxueg Committed by GitHub

Adds Haply device API with force feedback and teleoperation demo (#3873)

# Description
Add Haply haptic device teleoperation support for robotic manipulation
with force feedback.

## Type of change

- New feature (non-breaking change which adds functionality)

1.  Add haply devices API for teleoperation with force feedback
2. Real-time teleoperation with force feedback demo via Haply Inverse3
handle

[Haply device documentation](https://docs.haply.co/docs/quick-start) and
[usage](https://docs.haply.co/inverseSDK/)

**Usage (make sure your Haply device is connected):**
```bash
./isaaclab.sh -p scripts/demos/haply_teleoperation.py --websocket_uri ws://localhost:10001 --pos_sensitivity 1.65
```

## Screenshots

![haply-frankal](https://github.com/user-attachments/assets/f2c0572e-83d6-4c10-b95d-6ea083b90f62)

## Checklist

- [x] I have read and understood the [contribution
guidelines](https://isaac-sim.github.io/IsaacLab/main/source/refs/contributing.html)
- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./isaaclab.sh --format`
- [x] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [x] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there

---------
Signed-off-by: 's avatarmingxueg-nv <mingxueg@nvidia.com>
Signed-off-by: 's avatarmingxueg <mingxueg@nvidia.com>
Co-authored-by: 's avatargreptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Co-authored-by: 's avatarKelly Guo <kellyg@nvidia.com>
parent 87383c85
......@@ -100,6 +100,7 @@ Guidelines for modifications:
* Michael Noseworthy
* Michael Lin
* Miguel Alonso Jr
* Mingxue Gu
* Mingyu Lee
* Muhong Guo
* Narendra Dahile
......
......@@ -15,6 +15,7 @@
Se3Keyboard
Se2SpaceMouse
Se3SpaceMouse
HaplyDevice
OpenXRDevice
ManusVive
isaaclab.devices.openxr.retargeters.GripperRetargeter
......@@ -79,6 +80,14 @@ Space Mouse
:inherited-members:
:show-inheritance:
Haply
-----
.. autoclass:: HaplyDevice
:members:
:inherited-members:
:show-inheritance:
OpenXR
------
......
.. _haply-teleoperation:
Setting up Haply Teleoperation
===============================
.. currentmodule:: isaaclab
`Haply Devices`_ provides haptic devices that enable intuitive robot teleoperation with
directional force feedback. The Haply Inverse3 paired with the VerseGrip creates an
end-effector control system with force feedback capabilities.
Isaac Lab supports Haply devices for teleoperation workflows that require precise spatial
control with haptic feedback. This enables operators to feel contact forces during manipulation
tasks, improving control quality and task performance.
This guide explains how to set up and use Haply devices with Isaac Lab for robot teleoperation.
.. _Haply Devices: https://haply.co/
Overview
--------
Using Haply with Isaac Lab involves the following components:
* **Isaac Lab** simulates the robot environment and streams contact forces back to the operator
* **Haply Inverse3** provides 3-DOF position tracking and force feedback in the operator's workspace
* **Haply VerseGrip** adds orientation sensing and button inputs for gripper control
* **Haply SDK** manages WebSocket communication between Isaac Lab and the Haply hardware
This guide will walk you through:
* :ref:`haply-system-requirements`
* :ref:`haply-installation`
* :ref:`haply-device-setup`
* :ref:`haply-running-demo`
* :ref:`haply-troubleshooting`
.. _haply-system-requirements:
System Requirements
-------------------
Hardware Requirements
~~~~~~~~~~~~~~~~~~~~~
* **Isaac Lab Workstation**
* Ubuntu 22.04 or Ubuntu 24.04
* Hardware requirements for 200Hz physics simulation:
* CPU: 8-Core Intel Core i7 or AMD Ryzen 7 (or higher)
* Memory: 32GB RAM (64GB recommended)
* GPU: RTX 3090 or higher
* Network: Same local network as Haply devices for WebSocket communication
* **Haply Devices**
* Haply Inverse3 - Haptic device for position tracking and force feedback
* Haply VerseGrip - Wireless controller for orientation and button inputs
* Both devices must be powered on and connected to the Haply SDK
Software Requirements
~~~~~~~~~~~~~~~~~~~~~
* Isaac Lab (follow the :ref:`installation guide <isaaclab-installation-root>`)
* Haply SDK (provided by Haply Robotics)
* Python 3.10+
* ``websockets`` Python package (automatically installed with Isaac Lab)
.. _haply-installation:
Installation
------------
1. Install Isaac Lab
~~~~~~~~~~~~~~~~~~~~
Follow the Isaac Lab :ref:`installation guide <isaaclab-installation-root>` to set up your environment.
The ``websockets`` dependency is automatically included in Isaac Lab's requirements.
2. Install Haply SDK
~~~~~~~~~~~~~~~~~~~~
Download the Haply SDK from the `Haply Devices`_ website.
Install the SDK software and configure the devices.
3. Verify Installation
~~~~~~~~~~~~~~~~~~~~~~
Test that your Haply devices are detected by the Haply Device Manager.
You should see both Inverse3 and VerseGrip as connected.
.. _haply-device-setup:
Device Setup
------------
1. Physical Setup
~~~~~~~~~~~~~~~~~
* Place the Haply Inverse3 on a stable surface
* Ensure the VerseGrip is charged and paired
* Position yourself comfortably to reach the Inverse3 workspace
* Keep the workspace clear of obstacles
2. Start Haply SDK
~~~~~~~~~~~~~~~~~~
Launch the Haply SDK according to Haply's documentation. The SDK typically:
* Runs a WebSocket server on ``localhost:10001``
* Streams device data at 200Hz
* Displays connection status for both devices
3. Test Communication
~~~~~~~~~~~~~~~~~~~~~
You can test the WebSocket connection using the following Python script:
.. code:: python
import asyncio
import websockets
import json
async def test_haply():
uri = "ws://localhost:10001"
async with websockets.connect(uri) as ws:
response = await ws.recv()
data = json.loads(response)
print("Inverse3:", data.get("inverse3", []))
print("VerseGrip:", data.get("wireless_verse_grip", []))
asyncio.run(test_haply())
You should see device data streaming from both Inverse3 and VerseGrip.
.. _haply-running-demo:
Running the Demo
----------------
The Haply teleoperation demo showcases robot manipulation with force feedback using
a Franka Panda arm.
Basic Usage
~~~~~~~~~~~
.. tab-set::
:sync-group: os
.. tab-item:: :icon:`fa-brands fa-linux` Linux
:sync: linux
.. code:: bash
# Ensure Haply SDK is running
./isaaclab.sh -p scripts/demos/haply_teleoperation.py --websocket_uri ws://localhost:10001 --pos_sensitivity 1.65
.. tab-item:: :icon:`fa-brands fa-windows` Windows
:sync: windows
.. code:: batch
REM Ensure Haply SDK is running
isaaclab.bat -p scripts\demos\haply_teleoperation.py --websocket_uri ws://localhost:10001 --pos_sensitivity 1.65
The demo will:
1. Connect to the Haply devices via WebSocket
2. Spawn a Franka Panda robot and a cube in simulation
3. Map Haply position to robot end-effector position
4. Stream contact forces back to the Inverse3 for haptic feedback
Controls
~~~~~~~~
* **Move Inverse3**: Controls the robot end-effector position
* **VerseGrip Button A**: Open gripper
* **VerseGrip Button B**: Close gripper
* **VerseGrip Button C**: Rotate end-effector by 60°
Advanced Options
~~~~~~~~~~~~~~~~
Customize the demo with command-line arguments:
.. code:: bash
# Use custom WebSocket URI
./isaaclab.sh -p scripts/demos/haply_teleoperation.py \
--websocket_uri ws://192.168.1.100:10001
# Adjust position sensitivity (default: 1.0)
./isaaclab.sh -p scripts/demos/haply_teleoperation.py \
--websocket_uri ws://localhost:10001 \
--pos_sensitivity 2.0
Demo Features
~~~~~~~~~~~~~
* **Workspace Mapping**: Haply workspace is mapped to robot reachable space with safety limits
* **Inverse Kinematics**: Inverse Kinematics (IK) computes joint positions for desired end-effector pose
* **Force Feedback**: Contact forces from end-effector sensors are sent to Inverse3 for haptic feedback
.. _haply-troubleshooting:
Troubleshooting
---------------
No Haptic Feedback
~~~~~~~~~~~~~~~~~~
**Problem**: No haptic feedback felt on Inverse3
Solutions:
* Verify Inverse3 is the active device in Haply SDK
* Check contact forces are non-zero in simulation (try grasping the cube)
* Ensure ``limit_force`` is not set too low (default: 2.0N)
Next Steps
----------
* **Customize the demo**: Modify the workspace mapping or add custom button behaviors
* **Implement your own controller**: Use :class:`~isaaclab.devices.HaplyDevice` in your own scripts
For more information on device APIs, see :class:`~isaaclab.devices.HaplyDevice` in the API documentation.
......@@ -149,6 +149,18 @@ teleoperation in Isaac Lab.
cloudxr_teleoperation
Setting up Haply Teleoperation
------------------------------
This guide explains how to use Haply Inverse3 and VerseGrip devices for robot teleoperation
with directional force feedback in Isaac Lab.
.. toctree::
:maxdepth: 1
haply_teleoperation
Understanding Simulation Performance
------------------------------------
......
......@@ -228,6 +228,41 @@ A few quick showroom scripts to run and checkout:
- Teleoperate a Franka Panda robot using Haply haptic device with force feedback:
.. tab-set::
:sync-group: os
.. tab-item:: :icon:`fa-brands fa-linux` Linux
:sync: linux
.. code:: bash
./isaaclab.sh -p scripts/demos/haply_teleoperation.py --websocket_uri ws://localhost:10001 --pos_sensitivity 1.65
.. tab-item:: :icon:`fa-brands fa-windows` Windows
:sync: windows
.. code:: batch
isaaclab.bat -p scripts\demos\haply_teleoperation.py --websocket_uri ws://localhost:10001 --pos_sensitivity 1.65
.. image:: ../_static/demos/haply_teleop_franka.jpg
:width: 100%
:alt: Haply teleoperation with force feedback
This demo requires Haply Inverse3 and VerseGrip devices.
The goal of this demo is to pick up the cube or touch it with the end-effector.
The Haply devices provide:
* 3 dimensional position tracking for end-effector control
* Directional force feedback for contact sensing
* Button inputs for gripper and end-effector rotation control
See :ref:`haply-teleoperation` for detailed setup instructions.
- Create and spawn procedurally generated terrains with different configurations:
.. tab-set::
......
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Demonstration of Haply device teleoperation with a robotic arm.
This script demonstrates how to use a Haply device (Inverse3 + VerseGrip) to
teleoperate a robotic arm in Isaac Lab. The Haply provides:
- Position tracking from the Inverse3 device
- Orientation and button inputs from the VerseGrip device
- Force feedback
.. code-block:: bash
# Usage
./isaaclab.sh -p scripts/demos/haply_teleoperation.py
# With custom WebSocket URI
./isaaclab.sh -p scripts/demos/haply_teleoperation.py --websocket_uri ws://localhost:10001
# With sensitivity adjustment
./isaaclab.sh -p scripts/demos/haply_teleoperation.py --pos_sensitivity 2.0
Prerequisites:
1. Install websockets package: pip install websockets
2. Have Haply SDK running and accessible via WebSocket
3. Connect Inverse3 and VerseGrip devices
"""
"""Launch Isaac Sim Simulator first."""
import argparse
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Demonstration of Haply device teleoperation with Isaac Lab.")
parser.add_argument("--num_envs", type=int, default=1, help="Number of environments to spawn.")
parser.add_argument(
"--websocket_uri",
type=str,
default="ws://localhost:10001",
help="WebSocket URI for Haply SDK connection.",
)
parser.add_argument(
"--pos_sensitivity",
type=float,
default=1.0,
help="Position sensitivity scaling factor.",
)
AppLauncher.add_app_launcher_args(parser)
args_cli = parser.parse_args()
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
import numpy as np
import torch
import isaaclab.sim as sim_utils
from isaaclab.assets import Articulation, AssetBaseCfg, RigidObject, RigidObjectCfg
from isaaclab.controllers import DifferentialIKController, DifferentialIKControllerCfg
from isaaclab.devices import HaplyDevice, HaplyDeviceCfg
from isaaclab.scene import InteractiveScene, InteractiveSceneCfg
from isaaclab.sensors import ContactSensor, ContactSensorCfg
from isaaclab.utils import configclass
from isaaclab.utils.assets import ISAAC_NUCLEUS_DIR
from isaaclab_assets import FRANKA_PANDA_HIGH_PD_CFG # isort: skip
# Workspace mapping constants
HAPLY_Z_OFFSET = 0.35
WORKSPACE_LIMITS = {
"x": (0.1, 0.9),
"y": (-0.50, 0.50),
"z": (1.05, 1.85),
}
def apply_haply_to_robot_mapping(
haply_pos: np.ndarray | torch.Tensor,
haply_initial_pos: np.ndarray | list,
robot_initial_pos: np.ndarray | torch.Tensor,
) -> np.ndarray:
"""Apply coordinate mapping from Haply workspace to Franka Panda end-effector.
Uses absolute position control: robot position = robot_initial_pos + haply_pos (transformed)
Args:
haply_pos: Current Haply absolute position [x, y, z] in meters
haply_initial_pos: Haply's zero reference position [x, y, z]
robot_initial_pos: Base offset for robot end-effector
Returns:
robot_pos: Target position for robot EE in world frame [x, y, z]
"""
# Convert to numpy
if isinstance(haply_pos, torch.Tensor):
haply_pos = haply_pos.cpu().numpy()
if isinstance(robot_initial_pos, torch.Tensor):
robot_initial_pos = robot_initial_pos.cpu().numpy()
haply_delta = haply_pos - haply_initial_pos
# Coordinate system mapping: Haply (X, Y, Z) -> Robot (-Y, X, Z-offset)
robot_offset = np.array([-haply_delta[1], haply_delta[0], haply_delta[2] - HAPLY_Z_OFFSET])
robot_pos = robot_initial_pos + robot_offset
# Apply workspace limits for safety
robot_pos[0] = np.clip(robot_pos[0], WORKSPACE_LIMITS["x"][0], WORKSPACE_LIMITS["x"][1])
robot_pos[1] = np.clip(robot_pos[1], WORKSPACE_LIMITS["y"][0], WORKSPACE_LIMITS["y"][1])
robot_pos[2] = np.clip(robot_pos[2], WORKSPACE_LIMITS["z"][0], WORKSPACE_LIMITS["z"][1])
return robot_pos
@configclass
class FrankaHaplySceneCfg(InteractiveSceneCfg):
"""Configuration for Franka scene with Haply teleoperation and contact sensors."""
ground = AssetBaseCfg(
prim_path="/World/defaultGroundPlane",
spawn=sim_utils.GroundPlaneCfg(),
)
dome_light = AssetBaseCfg(
prim_path="/World/Light",
spawn=sim_utils.DomeLightCfg(intensity=3000.0, color=(0.75, 0.75, 0.75)),
)
table = AssetBaseCfg(
prim_path="{ENV_REGEX_NS}/Table",
spawn=sim_utils.UsdFileCfg(
usd_path=f"{ISAAC_NUCLEUS_DIR}/Props/Mounts/SeattleLabTable/table_instanceable.usd",
scale=(1.0, 1.0, 1.0),
),
init_state=AssetBaseCfg.InitialStateCfg(pos=(0.50, 0.0, 1.05), rot=(0.707, 0, 0, 0.707)),
)
robot: Articulation = FRANKA_PANDA_HIGH_PD_CFG.replace(prim_path="{ENV_REGEX_NS}/Robot")
robot.init_state.pos = (-0.02, 0.0, 1.05)
robot.spawn.activate_contact_sensors = True
cube = RigidObjectCfg(
prim_path="{ENV_REGEX_NS}/Cube",
spawn=sim_utils.CuboidCfg(
size=(0.06, 0.06, 0.06),
rigid_props=sim_utils.RigidBodyPropertiesCfg(),
mass_props=sim_utils.MassPropertiesCfg(mass=0.5),
collision_props=sim_utils.CollisionPropertiesCfg(),
physics_material=sim_utils.RigidBodyMaterialCfg(static_friction=0.5, dynamic_friction=0.5),
visual_material=sim_utils.PreviewSurfaceCfg(diffuse_color=(0.2, 0.8, 0.2), metallic=0.2),
),
init_state=RigidObjectCfg.InitialStateCfg(pos=(0.60, 0.00, 1.15)),
)
left_finger_contact_sensor = ContactSensorCfg(
prim_path="{ENV_REGEX_NS}/Robot/panda_leftfinger",
update_period=0.0,
history_length=3,
debug_vis=True,
track_pose=True,
)
right_finger_contact_sensor = ContactSensorCfg(
prim_path="{ENV_REGEX_NS}/Robot/panda_rightfinger",
update_period=0.0,
history_length=3,
debug_vis=True,
track_pose=True,
)
def run_simulator(
sim: sim_utils.SimulationContext,
scene: InteractiveScene,
haply_device: HaplyDevice,
):
"""Runs the simulation loop with Haply teleoperation."""
sim_dt = sim.get_physics_dt()
count = 1
robot: Articulation = scene["robot"]
cube: RigidObject = scene["cube"]
left_finger_sensor: ContactSensor = scene["left_finger_contact_sensor"]
right_finger_sensor: ContactSensor = scene["right_finger_contact_sensor"]
ee_body_name = "panda_hand"
ee_body_idx = robot.body_names.index(ee_body_name)
joint_pos = robot.data.default_joint_pos.clone()
joint_pos[0, :7] = torch.tensor([0.0, -0.569, 0.0, -2.81, 0.0, 3.037, 0.741], device=robot.device)
joint_vel = robot.data.default_joint_vel.clone()
robot.write_joint_state_to_sim(joint_pos, joint_vel)
for _ in range(10):
scene.write_data_to_sim()
sim.step()
scene.update(sim_dt)
# Initialize the position of franka
robot_initial_pos = robot.data.body_pos_w[0, ee_body_idx].cpu().numpy()
haply_initial_pos = np.array([0.0, 0.0, 0.0], dtype=np.float32)
ik_controller_cfg = DifferentialIKControllerCfg(
command_type="position",
use_relative_mode=False,
ik_method="dls",
ik_params={"lambda_val": 0.05},
)
# IK joints control arms, buttons control ee rotation and gripper open/close
arm_joint_names = [
"panda_joint1",
"panda_joint2",
"panda_joint3",
"panda_joint4",
"panda_joint5",
"panda_joint6",
]
arm_joint_indices = [robot.joint_names.index(name) for name in arm_joint_names]
# Initialize IK controller
ik_controller = DifferentialIKController(cfg=ik_controller_cfg, num_envs=scene.num_envs, device=sim.device)
initial_ee_quat = robot.data.body_quat_w[:, ee_body_idx]
ik_controller.set_command(command=torch.zeros(scene.num_envs, 3, device=sim.device), ee_quat=initial_ee_quat)
prev_button_a = False
prev_button_b = False
prev_button_c = False
gripper_target = 0.04
# Initialize the rotation of franka end-effector
ee_rotation_angle = robot.data.joint_pos[0, 6].item()
rotation_step = np.pi / 3
print("\n[INFO] Teleoperation ready!")
print(" Move handler: Control pose of the end-effector")
print(" Button A: Open | Button B: Close | Button C: Rotate EE (60°)\n")
while simulation_app.is_running():
if count % 10000 == 0:
count = 1
root_state = robot.data.default_root_state.clone()
root_state[:, :3] += scene.env_origins
robot.write_root_pose_to_sim(root_state[:, :7])
robot.write_root_velocity_to_sim(root_state[:, 7:])
joint_pos = robot.data.default_joint_pos.clone()
joint_pos[0, :7] = torch.tensor([0.0, -0.569, 0.0, -2.81, 0.0, 3.037, 0.741], device=robot.device)
joint_vel = robot.data.default_joint_vel.clone()
robot.write_joint_state_to_sim(joint_pos, joint_vel)
cube_state = cube.data.default_root_state.clone()
cube_state[:, :3] += scene.env_origins
cube.write_root_pose_to_sim(cube_state[:, :7])
cube.write_root_velocity_to_sim(cube_state[:, 7:])
scene.reset()
haply_device.reset()
ik_controller.reset()
print("[INFO]: Resetting robot state...")
# Get the data from Haply device
haply_data = haply_device.advance()
haply_pos = haply_data[:3]
button_a = haply_data[7].item() > 0.5
button_b = haply_data[8].item() > 0.5
button_c = haply_data[9].item() > 0.5
if button_a and not prev_button_a:
gripper_target = 0.04 # Open gripper
if button_b and not prev_button_b:
gripper_target = 0.0 # Close gripper
if button_c and not prev_button_c:
joint_7_limit = 3.0
ee_rotation_angle += rotation_step
if ee_rotation_angle > joint_7_limit:
ee_rotation_angle = -joint_7_limit + (ee_rotation_angle - joint_7_limit)
elif ee_rotation_angle < -joint_7_limit:
ee_rotation_angle = joint_7_limit + (ee_rotation_angle + joint_7_limit)
prev_button_a = button_a
prev_button_b = button_b
prev_button_c = button_c
# Compute IK
target_pos = apply_haply_to_robot_mapping(
haply_pos,
haply_initial_pos,
robot_initial_pos,
)
target_pos_tensor = torch.tensor(target_pos, dtype=torch.float32, device=sim.device).unsqueeze(0)
current_joint_pos = robot.data.joint_pos[:, arm_joint_indices]
ee_pos_w = robot.data.body_pos_w[:, ee_body_idx]
ee_quat_w = robot.data.body_quat_w[:, ee_body_idx]
# get jacobian to IK controller
jacobian = robot.root_physx_view.get_jacobians()[:, ee_body_idx, :, arm_joint_indices]
ik_controller.set_command(command=target_pos_tensor, ee_quat=ee_quat_w)
joint_pos_des = ik_controller.compute(ee_pos_w, ee_quat_w, jacobian, current_joint_pos)
joint_pos_target = robot.data.joint_pos[0].clone()
# Update joints: 6 from IK + 1 from button control (correct by design)
joint_pos_target[arm_joint_indices] = joint_pos_des[0] # panda_joint1-6 from IK
joint_pos_target[6] = ee_rotation_angle # panda_joint7 - end-effector rotation (button C)
joint_pos_target[[-2, -1]] = gripper_target # gripper
robot.set_joint_position_target(joint_pos_target.unsqueeze(0))
for _ in range(5):
scene.write_data_to_sim()
sim.step()
scene.update(sim_dt)
count += 1
# get contact forces and apply force feedback
left_finger_forces = left_finger_sensor.data.net_forces_w[0, 0]
right_finger_forces = right_finger_sensor.data.net_forces_w[0, 0]
total_contact_force = (left_finger_forces + right_finger_forces) * 0.5
haply_device.push_force(forces=total_contact_force.unsqueeze(0), position=torch.tensor([0]))
def main():
"""Main function to set up and run the Haply teleoperation demo."""
sim_cfg = sim_utils.SimulationCfg(device=args_cli.device, dt=1 / 200)
sim = sim_utils.SimulationContext(sim_cfg)
# set the simulation view
sim.set_camera_view([1.6, 1.0, 1.70], [0.4, 0.0, 1.0])
scene_cfg = FrankaHaplySceneCfg(num_envs=args_cli.num_envs, env_spacing=2.0)
scene = InteractiveScene(scene_cfg)
# Create Haply device
haply_cfg = HaplyDeviceCfg(
websocket_uri=args_cli.websocket_uri,
pos_sensitivity=args_cli.pos_sensitivity,
sim_device=args_cli.device,
limit_force=2.0,
)
haply_device = HaplyDevice(cfg=haply_cfg)
print(f"[INFO] Haply connected: {args_cli.websocket_uri}")
sim.reset()
run_simulator(sim, scene, haply_device)
if __name__ == "__main__":
main()
simulation_app.close()
......@@ -18,7 +18,8 @@ requirements = [
"toml",
"hidapi",
"gymnasium==0.29.0",
"trimesh"
"trimesh",
"websockets"
]
modules = [
......@@ -27,7 +28,8 @@ modules = [
"toml",
"hid",
"gymnasium",
"trimesh"
"trimesh",
"websockets"
]
use_online_index=true
......
Changelog
---------
0.48.1 (2025-11-10)
~~~~~~~~~~~~~~~~~~~
Added
^^^^^
* Added :class:`~isaaclab.devices.haply.HaplyDevice` class for SE(3) teleoperation with dual Haply Inverse3 and Versegrip devices,
supporting robot manipulation with haptic feedback.
* Added demo script ``scripts/demos/haply_teleoperation.py`` and documentation guide in
``docs/source/how-to/haply_teleoperation.rst`` for Haply-based robot teleoperation.
0.48.0 (2025-11-03)
~~~~~~~~~~~~~~~~~~~
......
......@@ -11,6 +11,7 @@ Currently, the following categories of devices are supported:
* **Spacemouse**: 3D mouse with 6 degrees of freedom.
* **Gamepad**: Gamepad with 2D two joysticks and buttons. Example: Xbox controller.
* **OpenXR**: Uses hand tracking of index/thumb tip avg to drive the target pose. Gripping is done with pinching.
* **Haply**: Haptic device (Inverse3 + VerseGrip) with position, orientation tracking and force feedback.
All device interfaces inherit from the :class:`DeviceBase` class, which provides a
common interface for all devices. The device interface reads the input data when
......@@ -21,6 +22,7 @@ the peripheral device.
from .device_base import DeviceBase, DeviceCfg, DevicesCfg
from .gamepad import Se2Gamepad, Se2GamepadCfg, Se3Gamepad, Se3GamepadCfg
from .haply import HaplyDevice, HaplyDeviceCfg
from .keyboard import Se2Keyboard, Se2KeyboardCfg, Se3Keyboard, Se3KeyboardCfg
from .openxr import ManusVive, ManusViveCfg, OpenXRDevice, OpenXRDeviceCfg
from .retargeter_base import RetargeterBase, RetargeterCfg
......
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Haply device interface for teleoperation."""
from .se3_haply import HaplyDevice, HaplyDeviceCfg
__all__ = ["HaplyDevice", "HaplyDeviceCfg"]
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Haply device controller for SE3 control with force feedback."""
import asyncio
import json
import numpy as np
import threading
import time
import torch
from collections.abc import Callable
from dataclasses import dataclass
try:
import websockets
WEBSOCKETS_AVAILABLE = True
except ImportError:
WEBSOCKETS_AVAILABLE = False
from ..device_base import DeviceBase, DeviceCfg
from ..retargeter_base import RetargeterBase
@dataclass
class HaplyDeviceCfg(DeviceCfg):
"""Configuration for Haply device.
Attributes:
websocket_uri: WebSocket URI for Haply SDK connection
pos_sensitivity: Position sensitivity scaling factor
data_rate: Data exchange rate in Hz
limit_force: Maximum force magnitude in Newtons (safety limit)
"""
websocket_uri: str = "ws://localhost:10001"
pos_sensitivity: float = 1.0
data_rate: float = 200.0
limit_force: float = 2.0
class HaplyDevice(DeviceBase):
"""A Haply device controller for sending SE(3) commands with force feedback.
This class provides an interface to Haply robotic devices (Inverse3 + VerseGrip)
for teleoperation. It communicates via WebSocket and supports:
- Position tracking from Inverse3 device
- Orientation and button inputs from VerseGrip device
- Directional force feedback to Inverse3
- Real-time data streaming at configurable rates
The device provides raw data:
* Position: 3D position (x, y, z) in meters from Inverse3
* Orientation: Quaternion (x, y, z, w) from VerseGrip
* Buttons: Three buttons (a, b, c) from VerseGrip with state (pressed/not pressed)
Note: All button logic (e.g., gripper control, reset, mode switching) should be
implemented in the application layer using the raw button states from advance().
Note:
Requires the Haply SDK to be running and accessible via WebSocket.
Install dependencies: pip install websockets
"""
def __init__(self, cfg: HaplyDeviceCfg, retargeters: list[RetargeterBase] | None = None):
"""Initialize the Haply device interface.
Args:
cfg: Configuration object for Haply device settings.
retargeters: Optional list of retargeting components that transform device data
into robot commands. If None or empty, the device outputs its native data format.
Raises:
ImportError: If websockets module is not installed.
RuntimeError: If connection to Haply device fails.
"""
super().__init__(retargeters)
if not WEBSOCKETS_AVAILABLE:
raise ImportError("websockets module is required for Haply device. Install with: pip install websockets")
# Store configuration
self.websocket_uri = cfg.websocket_uri
self.pos_sensitivity = cfg.pos_sensitivity
self.data_rate = cfg.data_rate
self._sim_device = cfg.sim_device
self.limit_force = cfg.limit_force
# Device status (True only when both Inverse3 and VerseGrip are connected)
self.connected = False
self._connected_lock = threading.Lock()
# Device IDs (will be set after first message)
self.inverse3_device_id = None
self.verse_grip_device_id = None
# Current data cache
self.cached_data = {
"position": np.zeros(3, dtype=np.float32),
"quaternion": np.array([0.0, 0.0, 0.0, 1.0], dtype=np.float32),
"buttons": {"a": False, "b": False, "c": False},
"inverse3_connected": False,
"versegrip_connected": False,
}
self.data_lock = threading.Lock()
# Force feedback
self.feedback_force = {"x": 0.0, "y": 0.0, "z": 0.0}
self.force_lock = threading.Lock()
self._additional_callbacks = dict()
# Button state tracking
self._prev_buttons = {"a": False, "b": False, "c": False}
# Connection monitoring
self.consecutive_timeouts = 0
self.max_consecutive_timeouts = 10 # ~10 seconds at 1s timeout
self.timeout_warning_issued = False
# Start WebSocket connection
self.running = True
self._websocket_thread = None
self._start_websocket_thread()
# Wait for both devices to connect
timeout = 5.0
start_time = time.time()
while (time.time() - start_time) < timeout:
with self._connected_lock:
if self.connected:
break
time.sleep(0.1)
with self._connected_lock:
if not self.connected:
raise RuntimeError(f"Failed to connect both Inverse3 and VerseGrip devices within {timeout}s. ")
def __del__(self):
"""Cleanup on deletion: shutdown WebSocket connection and background thread."""
if not hasattr(self, "running") or not self.running:
return
self.running = False
# Reset force feedback before closing
if hasattr(self, "force_lock") and hasattr(self, "feedback_force"):
with self.force_lock:
self.feedback_force = {"x": 0.0, "y": 0.0, "z": 0.0}
# Explicitly wait for WebSocket thread to finish
if hasattr(self, "_websocket_thread") and self._websocket_thread is not None:
if self._websocket_thread.is_alive():
self._websocket_thread.join(timeout=2.0)
if self._websocket_thread.is_alive():
self._websocket_thread.daemon = True
def __str__(self) -> str:
"""Returns: A string containing the information of the device."""
msg = f"Haply Device Controller: {self.__class__.__name__}\n"
msg += f"\tWebSocket URI: {self.websocket_uri}\n"
msg += f"\tInverse3 ID: {self.inverse3_device_id}\n"
msg += f"\tVerseGrip ID: {self.verse_grip_device_id}\n"
msg += "\t----------------------------------------------\n"
msg += "\tOutput: [x, y, z, qx, qy, qz, qw, btn_a, btn_b, btn_c]\n"
msg += "\tInverse3: Provides position (x, y, z) and force feedback\n"
msg += "\tVerseGrip: Provides orientation (quaternion) and buttons (a, b, c)"
return msg
def reset(self):
"""Reset the device internal state."""
with self.force_lock:
self.feedback_force = {"x": 0.0, "y": 0.0, "z": 0.0}
# Reset button state tracking
self._prev_buttons = {"a": False, "b": False, "c": False}
def add_callback(self, key: str, func: Callable):
"""Add additional functions to bind to button events.
Args:
key: The button to check against. Valid values are "a", "b", "c".
func: The function to call when button is pressed. The callback function should not
take any arguments.
"""
if key not in ["a", "b", "c"]:
raise ValueError(f"Invalid button key: {key}. Valid keys are 'a', 'b', 'c'.")
self._additional_callbacks[key] = func
def advance(self) -> torch.Tensor:
"""Provides the result from Haply device state.
Returns:
torch.Tensor: A tensor containing the raw device data:
- 10 elements: [x, y, z, qx, qy, qz, qw, button_a, button_b, button_c]
where (x, y, z) is position, (qx, qy, qz, qw) is quaternion orientation,
and buttons are 1.0 (pressed) or 0.0 (not pressed)
"""
with self.data_lock:
if not (self.cached_data["inverse3_connected"] and self.cached_data["versegrip_connected"]):
raise RuntimeError("Haply devices not connected. Both Inverse3 and VerseGrip must be connected.")
# Safe copy within lock
position = self.cached_data["position"].copy() * self.pos_sensitivity
quaternion = self.cached_data["quaternion"].copy()
button_a = self.cached_data["buttons"].get("a", False)
button_b = self.cached_data["buttons"].get("b", False)
button_c = self.cached_data["buttons"].get("c", False)
# Button callbacks execute OUTSIDE lock to prevent deadlock
for button_key, current_state in [("a", button_a), ("b", button_b), ("c", button_c)]:
prev_state = self._prev_buttons.get(button_key, False)
if current_state and not prev_state:
if button_key in self._additional_callbacks:
self._additional_callbacks[button_key]()
self._prev_buttons[button_key] = current_state
button_states = np.array(
[
1.0 if button_a else 0.0,
1.0 if button_b else 0.0,
1.0 if button_c else 0.0,
],
dtype=np.float32,
)
# Construct command tensor: [position(3), quaternion(4), buttons(3)]
command = np.concatenate([position, quaternion, button_states])
return torch.tensor(command, dtype=torch.float32, device=self._sim_device)
def push_force(self, forces: torch.Tensor, position: torch.Tensor) -> None:
"""Push force vector to Haply Inverse3 device.
Overrides DeviceBase.push_force() to provide force feedback for Haply Inverse3.
Forces are clipped to [-limit_force, limit_force] range for safety.
Args:
forces: Tensor of shape (N, 3) with forces [fx, fy, fz].
position: Tensor of shape (N) with indices specifying which forces to use.
"""
# Check if forces is empty
if forces.shape[0] == 0:
raise ValueError("No forces provided")
# Select forces using position indices
selected_forces = forces[position] if position.ndim > 0 else forces[position].unsqueeze(0)
force = selected_forces.sum(dim=0)
force = force.cpu().numpy() if force.is_cuda else force.numpy()
fx = np.clip(force[0], -self.limit_force, self.limit_force)
fy = np.clip(force[1], -self.limit_force, self.limit_force)
fz = np.clip(force[2], -self.limit_force, self.limit_force)
with self.force_lock:
self.feedback_force = {"x": float(fx), "y": float(fy), "z": float(fz)}
def _start_websocket_thread(self):
"""Start WebSocket connection thread."""
def websocket_thread():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._websocket_loop())
self._websocket_thread = threading.Thread(target=websocket_thread, daemon=False)
self._websocket_thread.start()
async def _websocket_loop(self):
"""WebSocket data reading and writing loop."""
while self.running:
try:
async with websockets.connect(self.websocket_uri, ping_interval=None, ping_timeout=None) as ws:
first_message = True
while self.running:
try:
response = await asyncio.wait_for(ws.recv(), timeout=1.0)
data = json.loads(response)
self.consecutive_timeouts = 0
if self.timeout_warning_issued:
self.timeout_warning_issued = False
# Safe array access - no IndexError risk with ternary operator
inverse3_list = data.get("inverse3", [])
verse_grip_list = data.get("wireless_verse_grip", [])
inverse3_data = inverse3_list[0] if inverse3_list else {}
verse_grip_data = verse_grip_list[0] if verse_grip_list else {}
if first_message:
first_message = False
if inverse3_data:
self.inverse3_device_id = inverse3_data.get("device_id")
if verse_grip_data:
self.verse_grip_device_id = verse_grip_data.get("device_id")
with self.data_lock:
inverse3_connected = False
versegrip_connected = False
if inverse3_data and "state" in inverse3_data:
cursor_pos = inverse3_data["state"].get("cursor_position", {})
if cursor_pos:
self.cached_data["position"] = np.array(
[cursor_pos.get(k, 0.0) for k in ("x", "y", "z")], dtype=np.float32
)
inverse3_connected = True
if verse_grip_data and "state" in verse_grip_data:
state = verse_grip_data["state"]
self.cached_data["buttons"] = {
k: state.get("buttons", {}).get(k, False) for k in ("a", "b", "c")
}
orientation = state.get("orientation", {})
if orientation:
self.cached_data["quaternion"] = np.array(
[
orientation.get(k, 1.0 if k == "w" else 0.0)
for k in ("x", "y", "z", "w")
],
dtype=np.float32,
)
versegrip_connected = True
self.cached_data["inverse3_connected"] = inverse3_connected
self.cached_data["versegrip_connected"] = versegrip_connected
# Both devices required (AND logic): Inverse3 for position/force,
both_connected = inverse3_connected and versegrip_connected
with self._connected_lock:
self.connected = both_connected
# Send force feedback
if self.inverse3_device_id:
with self.force_lock:
current_force = self.feedback_force.copy()
request_msg = {
"inverse3": [{
"device_id": self.inverse3_device_id,
"commands": {"set_cursor_force": {"values": current_force}},
}]
}
await ws.send(json.dumps(request_msg))
await asyncio.sleep(1.0 / self.data_rate)
except asyncio.TimeoutError:
self.consecutive_timeouts += 1
# Check if timeout
if (
self.consecutive_timeouts >= self.max_consecutive_timeouts
and not self.timeout_warning_issued
):
self.timeout_warning_issued = True
with self.data_lock:
self.cached_data["inverse3_connected"] = False
self.cached_data["versegrip_connected"] = False
with self._connected_lock:
self.connected = False
continue
except Exception as e:
print(f"[ERROR] Error in WebSocket receive loop: {e}")
break
except Exception:
with self.data_lock:
self.cached_data["inverse3_connected"] = False
self.cached_data["versegrip_connected"] = False
with self._connected_lock:
self.connected = False
self.consecutive_timeouts = 0
self.timeout_warning_issued = False
if self.running:
await asyncio.sleep(2.0)
else:
break
......@@ -12,6 +12,7 @@ from collections.abc import Callable
from isaaclab.devices import DeviceBase, DeviceCfg
from isaaclab.devices.gamepad import Se2Gamepad, Se2GamepadCfg, Se3Gamepad, Se3GamepadCfg
from isaaclab.devices.haply import HaplyDevice, HaplyDeviceCfg
from isaaclab.devices.keyboard import Se2Keyboard, Se2KeyboardCfg, Se3Keyboard, Se3KeyboardCfg
from isaaclab.devices.openxr.retargeters import (
G1LowerBodyStandingRetargeter,
......@@ -47,6 +48,7 @@ DEVICE_MAP: dict[type[DeviceCfg], type[DeviceBase]] = {
Se2KeyboardCfg: Se2Keyboard,
Se2GamepadCfg: Se2Gamepad,
Se2SpaceMouseCfg: Se2SpaceMouse,
HaplyDeviceCfg: HaplyDevice,
OpenXRDeviceCfg: OpenXRDevice,
ManusViveCfg: ManusVive,
}
......
......@@ -13,12 +13,15 @@ simulation_app = AppLauncher(headless=True).app
"""Rest everything follows."""
import importlib
import json
import torch
import pytest
# Import device classes to test
from isaaclab.devices import (
HaplyDevice,
HaplyDeviceCfg,
OpenXRDevice,
OpenXRDeviceCfg,
Se2Gamepad,
......@@ -79,6 +82,11 @@ def mock_environment(mocker):
omni_mock.kit.xr.core.XRPoseValidityFlags.POSITION_VALID = 1
omni_mock.kit.xr.core.XRPoseValidityFlags.ORIENTATION_VALID = 2
# Mock Haply WebSocket
websockets_mock = mocker.MagicMock()
websocket_mock = mocker.MagicMock()
websockets_mock.connect.return_value.__aenter__.return_value = websocket_mock
return {
"carb": carb_mock,
"omni": omni_mock,
......@@ -89,6 +97,8 @@ def mock_environment(mocker):
"settings": settings_mock,
"hid": hid_mock,
"device": device_mock,
"websockets": websockets_mock,
"websocket": websocket_mock,
}
......@@ -321,6 +331,141 @@ def test_openxr_constructors(mock_environment, mocker):
device.reset()
"""
Test Haply devices.
"""
def test_haply_constructors(mock_environment, mocker):
"""Test constructor for HaplyDevice."""
# Test config-based constructor
config = HaplyDeviceCfg(
websocket_uri="ws://localhost:10001",
pos_sensitivity=1.5,
data_rate=250.0,
)
# Mock the websockets module and asyncio
device_mod = importlib.import_module("isaaclab.devices.haply.se3_haply")
mocker.patch.dict("sys.modules", {"websockets": mock_environment["websockets"]})
mocker.patch.object(device_mod, "websockets", mock_environment["websockets"])
# Mock asyncio to prevent actual async operations
asyncio_mock = mocker.MagicMock()
mocker.patch.object(device_mod, "asyncio", asyncio_mock)
# Mock threading to prevent actual thread creation
threading_mock = mocker.MagicMock()
thread_instance = mocker.MagicMock()
threading_mock.Thread.return_value = thread_instance
thread_instance.is_alive.return_value = False
mocker.patch.object(device_mod, "threading", threading_mock)
# Mock time.time() for connection timeout simulation
time_mock = mocker.MagicMock()
time_mock.time.side_effect = [0.0, 0.1, 0.2, 0.3, 6.0] # Will timeout
mocker.patch.object(device_mod, "time", time_mock)
# Create sample WebSocket response data
ws_response = {
"inverse3": [{
"device_id": "test_inverse3_123",
"state": {"cursor_position": {"x": 0.1, "y": 0.2, "z": 0.3}},
}],
"wireless_verse_grip": [{
"device_id": "test_versegrip_456",
"state": {
"orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0},
"buttons": {"a": False, "b": False, "c": False},
},
}],
}
# Configure websocket mock to return JSON data
mock_environment["websocket"].recv = mocker.AsyncMock(return_value=json.dumps(ws_response))
mock_environment["websocket"].send = mocker.AsyncMock()
# The constructor will raise RuntimeError due to timeout, which is expected in test
with pytest.raises(RuntimeError, match="Failed to connect both Inverse3 and VerseGrip devices"):
haply = HaplyDevice(config)
# Now test successful connection by mocking time to not timeout
time_mock.time.side_effect = [0.0, 0.1, 0.2, 0.3, 0.4] # Won't timeout
# Mock the connection status
mocker.patch.object(device_mod.HaplyDevice, "_start_websocket_thread")
haply = device_mod.HaplyDevice.__new__(device_mod.HaplyDevice)
haply._sim_device = config.sim_device
haply.websocket_uri = config.websocket_uri
haply.pos_sensitivity = config.pos_sensitivity
haply.data_rate = config.data_rate
haply.limit_force = config.limit_force
haply.connected = True
haply.inverse3_device_id = "test_inverse3_123"
haply.verse_grip_device_id = "test_versegrip_456"
haply.data_lock = threading_mock.Lock()
haply.force_lock = threading_mock.Lock()
haply._connected_lock = threading_mock.Lock()
haply._additional_callbacks = {}
haply._prev_buttons = {"a": False, "b": False, "c": False}
haply._websocket_thread = None # Initialize to prevent AttributeError in __del__
haply.running = True
haply.cached_data = {
"position": torch.tensor([0.1, 0.2, 0.3], dtype=torch.float32).numpy(),
"quaternion": torch.tensor([0.0, 0.0, 0.0, 1.0], dtype=torch.float32).numpy(),
"buttons": {"a": False, "b": False, "c": False},
"inverse3_connected": True,
"versegrip_connected": True,
}
haply.feedback_force = {"x": 0.0, "y": 0.0, "z": 0.0}
# Verify configuration was applied correctly
assert haply.websocket_uri == "ws://localhost:10001"
assert haply.pos_sensitivity == 1.5
assert haply.data_rate == 250.0
# Test advance() returns expected type
result = haply.advance()
assert isinstance(result, torch.Tensor)
assert result.shape == (10,) # (pos_x, pos_y, pos_z, qx, qy, qz, qw, btn_a, btn_b, btn_c)
# Test push_force with tensor (single force vector)
forces_within = torch.tensor([[1.0, 1.5, -0.5]], dtype=torch.float32)
position_zero = torch.tensor([0], dtype=torch.long)
haply.push_force(forces_within, position_zero)
assert haply.feedback_force["x"] == pytest.approx(1.0)
assert haply.feedback_force["y"] == pytest.approx(1.5)
assert haply.feedback_force["z"] == pytest.approx(-0.5)
# Test push_force with tensor (force limiting, default limit is 2.0 N)
forces_exceed = torch.tensor([[5.0, -10.0, 1.5]], dtype=torch.float32)
haply.push_force(forces_exceed, position_zero)
assert haply.feedback_force["x"] == pytest.approx(2.0)
assert haply.feedback_force["y"] == pytest.approx(-2.0)
assert haply.feedback_force["z"] == pytest.approx(1.5)
# Test push_force with position tensor (single index)
forces_multi = torch.tensor([[1.0, 2.0, 3.0], [0.5, 0.8, -0.3], [0.1, 0.2, 0.3]], dtype=torch.float32)
position_single = torch.tensor([1], dtype=torch.long)
haply.push_force(forces_multi, position=position_single)
assert haply.feedback_force["x"] == pytest.approx(0.5)
assert haply.feedback_force["y"] == pytest.approx(0.8)
assert haply.feedback_force["z"] == pytest.approx(-0.3)
# Test push_force with position tensor (multiple indices)
position_multi = torch.tensor([0, 2], dtype=torch.long)
haply.push_force(forces_multi, position=position_multi)
# Should sum forces[0] and forces[2]: [1.0+0.1, 2.0+0.2, 3.0+0.3] = [1.1, 2.2, 3.3]
# But clipped to [-2.0, 2.0]: [1.1, 2.0, 2.0]
assert haply.feedback_force["x"] == pytest.approx(1.1)
assert haply.feedback_force["y"] == pytest.approx(2.0)
assert haply.feedback_force["z"] == pytest.approx(2.0)
# Test reset functionality
haply.reset()
assert haply.feedback_force == {"x": 0.0, "y": 0.0, "z": 0.0}
"""
Test teleop device factory.
"""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment