Unverified Commit 8fa76ddc authored by peterd-NV's avatar peterd-NV Committed by GitHub

Updates Mimic test cases to pytest format (#550)

# Description

<!--
Thank you for your interest in sending a pull request. Please make sure
to check the contribution guidelines.

Link:
https://isaac-sim.github.io/IsaacLab/main/source/refs/contributing.html
-->

Updates the following tests to pytest format:

- test_pink_ik.py
- test_selection_strategy.py
- test_generate_dataset.py

Updates test_generate_dataset.py to check that the expected number of
annotations are successfully generated in the HDF5 during the annotation
phase. If not, then test returns failure. This ensures that physics is
behaving correctly and that the correct annotated demos are being
generated. Previously, if a physics issue is introduced, the test would
timeout instead of failing.

The annotate_demos.py script was updated to return the number of
successfully annotated demos in order to support the above test.

## Type of change

<!-- As you go through the list, delete the ones that are not
applicable. -->

- New feature (non-breaking change which adds functionality)

## Checklist

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./isaaclab.sh --format`
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [ ] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there

<!--
As you go through the checklist above, you can mark something as done by
putting an x character in it

For example,
- [x] I have done this task
- [ ] I have not done this task
-->
parent a958ac56
...@@ -159,7 +159,7 @@ def main(): ...@@ -159,7 +159,7 @@ def main():
if episode_count == 0: if episode_count == 0:
print("No episodes found in the dataset.") print("No episodes found in the dataset.")
exit() return 0
# get output directory path and file name (without extension) from cli arguments # get output directory path and file name (without extension) from cli arguments
output_dir = os.path.dirname(args_cli.output_file) output_dir = os.path.dirname(args_cli.output_file)
...@@ -236,6 +236,7 @@ def main(): ...@@ -236,6 +236,7 @@ def main():
# simulate environment -- run everything in inference mode # simulate environment -- run everything in inference mode
exported_episode_count = 0 exported_episode_count = 0
processed_episode_count = 0 processed_episode_count = 0
successful_task_count = 0 # Counter for successful task completions
with contextlib.suppress(KeyboardInterrupt) and torch.inference_mode(): with contextlib.suppress(KeyboardInterrupt) and torch.inference_mode():
while simulation_app.is_running() and not simulation_app.is_exiting(): while simulation_app.is_running() and not simulation_app.is_exiting():
# Iterate over the episodes in the loaded dataset file # Iterate over the episodes in the loaded dataset file
...@@ -259,6 +260,7 @@ def main(): ...@@ -259,6 +260,7 @@ def main():
) )
env.recorder_manager.export_episodes() env.recorder_manager.export_episodes()
exported_episode_count += 1 exported_episode_count += 1
successful_task_count += 1 # Increment successful task counter
print("\tExported the annotated episode.") print("\tExported the annotated episode.")
else: else:
print("\tSkipped exporting the episode due to incomplete subtask annotations.") print("\tSkipped exporting the episode due to incomplete subtask annotations.")
...@@ -268,11 +270,16 @@ def main(): ...@@ -268,11 +270,16 @@ def main():
f"\nExported {exported_episode_count} (out of {processed_episode_count}) annotated" f"\nExported {exported_episode_count} (out of {processed_episode_count}) annotated"
f" episode{'s' if exported_episode_count > 1 else ''}." f" episode{'s' if exported_episode_count > 1 else ''}."
) )
print(
f"Successful task completions: {successful_task_count}"
) # This line is used by the dataset generation test case to check if the expected number of demos were annotated
print("Exiting the app.") print("Exiting the app.")
# Close environment after annotation is complete # Close environment after annotation is complete
env.close() env.close()
return successful_task_count
def replay_episode( def replay_episode(
env: ManagerBasedRLMimicEnv, env: ManagerBasedRLMimicEnv,
...@@ -440,6 +447,8 @@ def annotate_episode_in_manual_mode( ...@@ -440,6 +447,8 @@ def annotate_episode_in_manual_mode(
if __name__ == "__main__": if __name__ == "__main__":
# run the main function # run the main function
main() successful_task_count = main()
# close sim app # close sim app
simulation_app.close() simulation_app.close()
# exit with the number of successful task completions as return code
exit(successful_task_count)
[package] [package]
# Note: Semantic Versioning is used: https://semver.org/ # Note: Semantic Versioning is used: https://semver.org/
version = "0.42.24" version = "0.42.25"
# Description # Description
title = "Isaac Lab framework for Robot Learning" title = "Isaac Lab framework for Robot Learning"
......
Changelog Changelog
--------- ---------
0.42.25 (2025-07-17)
~~~~~~~~~~~~~~~~~~~~
Changed
^^^^^^^
* Updated test_pink_ik.py test case to pytest format.
0.42.24 (2025-06-25) 0.42.24 (2025-06-25)
~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~
......
...@@ -21,7 +21,8 @@ simulation_app = AppLauncher(headless=True).app ...@@ -21,7 +21,8 @@ simulation_app = AppLauncher(headless=True).app
import contextlib import contextlib
import gymnasium as gym import gymnasium as gym
import torch import torch
import unittest
import pytest
from isaaclab.utils.math import axis_angle_from_quat, matrix_from_quat, quat_from_matrix, quat_inv from isaaclab.utils.math import axis_angle_from_quat, matrix_from_quat, quat_from_matrix, quat_inv
...@@ -30,171 +31,179 @@ import isaaclab_tasks.manager_based.manipulation.pick_place # noqa: F401 ...@@ -30,171 +31,179 @@ import isaaclab_tasks.manager_based.manipulation.pick_place # noqa: F401
from isaaclab_tasks.utils.parse_cfg import parse_env_cfg from isaaclab_tasks.utils.parse_cfg import parse_env_cfg
class TestPinkIKController(unittest.TestCase): @pytest.fixture
"""Test fixture for the Pink IK controller with the GR1T2 humanoid robot. def pink_ik_test_config():
"""Test configuration for Pink IK controller tests."""
# End effector position mean square error tolerance in meters
pos_tolerance = 0.03 # 3 cm
# End effector orientation mean square error tolerance in radians
rot_tolerance = 0.17 # 10 degrees
This test validates that the Pink IK controller can accurately track commanded # Number of environments
end-effector poses for a humanoid robot. It specifically: num_envs = 1
1. Creates a GR1T2 humanoid robot with the Pink IK controller # Number of joints in the 2 robot hands
2. Sends target pose commands to the left and right hand roll links num_joints_in_robot_hands = 22
3. Checks that the observed poses of the links match the target poses within tolerance
4. Tests adaptability by moving the hands up and down multiple times
The test succeeds when the controller can accurately converge to each new target # Number of steps to wait for controller convergence
position, demonstrating both accuracy and adaptability to changing targets. num_steps_controller_convergence = 25
"""
def setUp(self): num_times_to_move_hands_up = 3
num_times_to_move_hands_down = 3
# End effector position mean square error tolerance in meters # Create starting setpoints with respect to the env origin frame
self.pos_tolerance = 0.03 # 2 cm # These are the setpoints for the forward kinematics result of the
# End effector orientation mean square error tolerance in radians # InitialStateCfg specified in `PickPlaceGR1T2EnvCfg`
self.rot_tolerance = 0.17 # 10 degrees y_axis_z_axis_90_rot_quaternion = [0.5, 0.5, -0.5, 0.5]
left_hand_roll_link_pos = [-0.23, 0.28, 1.1]
left_hand_roll_link_pose = left_hand_roll_link_pos + y_axis_z_axis_90_rot_quaternion
right_hand_roll_link_pos = [0.23, 0.28, 1.1]
right_hand_roll_link_pose = right_hand_roll_link_pos + y_axis_z_axis_90_rot_quaternion
# Number of environments return {
self.num_envs = 1 "pos_tolerance": pos_tolerance,
"rot_tolerance": rot_tolerance,
"num_envs": num_envs,
"num_joints_in_robot_hands": num_joints_in_robot_hands,
"num_steps_controller_convergence": num_steps_controller_convergence,
"num_times_to_move_hands_up": num_times_to_move_hands_up,
"num_times_to_move_hands_down": num_times_to_move_hands_down,
"left_hand_roll_link_pose": left_hand_roll_link_pose,
"right_hand_roll_link_pose": right_hand_roll_link_pose,
}
# Number of joints in the 2 robot hands
self.num_joints_in_robot_hands = 22
# Number of steps to wait for controller convergence def test_gr1t2_ik_pose_abs(pink_ik_test_config):
self.num_steps_controller_convergence = 25 """Test IK controller for GR1T2 humanoid.
self.num_times_to_move_hands_up = 3 This test validates that the Pink IK controller can accurately track commanded
self.num_times_to_move_hands_down = 3 end-effector poses for a humanoid robot. It specifically:
# Create starting setpoints with respect to the env origin frame 1. Creates a GR1T2 humanoid robot with the Pink IK controller
# These are the setpoints for the forward kinematics result of the 2. Sends target pose commands to the left and right hand roll links
# InitialStateCfg specified in `PickPlaceGR1T2EnvCfg` 3. Checks that the observed poses of the links match the target poses within tolerance
y_axis_z_axis_90_rot_quaternion = [0.5, 0.5, -0.5, 0.5] 4. Tests adaptability by moving the hands up and down multiple times
left_hand_roll_link_pos = [-0.23, 0.28, 1.1]
self.left_hand_roll_link_pose = left_hand_roll_link_pos + y_axis_z_axis_90_rot_quaternion
right_hand_roll_link_pos = [0.23, 0.28, 1.1]
self.right_hand_roll_link_pose = right_hand_roll_link_pos + y_axis_z_axis_90_rot_quaternion
The test succeeds when the controller can accurately converge to each new target
position, demonstrating both accuracy and adaptability to changing targets.
""" """
Test fixtures.
"""
def test_gr1t2_ik_pose_abs(self):
"""Test IK controller for GR1T2 humanoid."""
env_name = "Isaac-PickPlace-GR1T2-Abs-v0"
device = "cuda:0"
env_cfg = parse_env_cfg(env_name, device=device, num_envs=self.num_envs)
# create environment from loaded config
env = gym.make(env_name, cfg=env_cfg).unwrapped
# reset before starting
obs, _ = env.reset()
num_runs = 0 # Counter for the number of runs
move_hands_up = True env_name = "Isaac-PickPlace-GR1T2-Abs-v0"
test_counter = 0 device = "cuda:0"
env_cfg = parse_env_cfg(env_name, device=device, num_envs=pink_ik_test_config["num_envs"])
# simulate environment -- run everything in inference mode
with contextlib.suppress(KeyboardInterrupt) and torch.inference_mode(): # create environment from loaded config
while simulation_app.is_running() and not simulation_app.is_exiting(): env = gym.make(env_name, cfg=env_cfg).unwrapped
num_runs += 1 # reset before starting
setpoint_poses = self.left_hand_roll_link_pose + self.right_hand_roll_link_pose obs, _ = env.reset()
actions = setpoint_poses + [0.0] * self.num_joints_in_robot_hands
actions = torch.tensor(actions, device=device) num_runs = 0 # Counter for the number of runs
actions = torch.stack([actions for _ in range(env.num_envs)])
move_hands_up = True
obs, _, _, _, _ = env.step(actions) test_counter = 0
left_hand_roll_link_pose_obs = obs["policy"]["robot_links_state"][ # Get poses from config
:, env.scene["robot"].data.body_names.index("left_hand_roll_link"), :7 left_hand_roll_link_pose = pink_ik_test_config["left_hand_roll_link_pose"].copy()
] right_hand_roll_link_pose = pink_ik_test_config["right_hand_roll_link_pose"].copy()
right_hand_roll_link_pose_obs = obs["policy"]["robot_links_state"][
:, env.scene["robot"].data.body_names.index("right_hand_roll_link"), :7 # simulate environment -- run everything in inference mode
] with contextlib.suppress(KeyboardInterrupt) and torch.inference_mode():
while simulation_app.is_running() and not simulation_app.is_exiting():
# The setpoints are wrt the env origin frame
# The observations are also wrt the env origin frame num_runs += 1
left_hand_roll_link_feedback = left_hand_roll_link_pose_obs setpoint_poses = left_hand_roll_link_pose + right_hand_roll_link_pose
left_hand_roll_link_setpoint = ( actions = setpoint_poses + [0.0] * pink_ik_test_config["num_joints_in_robot_hands"]
torch.tensor(self.left_hand_roll_link_pose, device=device).unsqueeze(0).repeat(env.num_envs, 1) actions = torch.tensor(actions, device=device)
actions = torch.stack([actions for _ in range(env.num_envs)])
obs, _, _, _, _ = env.step(actions)
left_hand_roll_link_pose_obs = obs["policy"]["robot_links_state"][
:, env.scene["robot"].data.body_names.index("left_hand_roll_link"), :7
]
right_hand_roll_link_pose_obs = obs["policy"]["robot_links_state"][
:, env.scene["robot"].data.body_names.index("right_hand_roll_link"), :7
]
# The setpoints are wrt the env origin frame
# The observations are also wrt the env origin frame
left_hand_roll_link_feedback = left_hand_roll_link_pose_obs
left_hand_roll_link_setpoint = (
torch.tensor(left_hand_roll_link_pose, device=device).unsqueeze(0).repeat(env.num_envs, 1)
)
left_hand_roll_link_pos_error = left_hand_roll_link_setpoint[:, :3] - left_hand_roll_link_feedback[:, :3]
left_hand_roll_link_rot_error = axis_angle_from_quat(
quat_from_matrix(
matrix_from_quat(left_hand_roll_link_setpoint[:, 3:])
* matrix_from_quat(quat_inv(left_hand_roll_link_feedback[:, 3:]))
) )
left_hand_roll_link_pos_error = ( )
left_hand_roll_link_setpoint[:, :3] - left_hand_roll_link_feedback[:, :3]
right_hand_roll_link_feedback = right_hand_roll_link_pose_obs
right_hand_roll_link_setpoint = (
torch.tensor(right_hand_roll_link_pose, device=device).unsqueeze(0).repeat(env.num_envs, 1)
)
right_hand_roll_link_pos_error = right_hand_roll_link_setpoint[:, :3] - right_hand_roll_link_feedback[:, :3]
right_hand_roll_link_rot_error = axis_angle_from_quat(
quat_from_matrix(
matrix_from_quat(right_hand_roll_link_setpoint[:, 3:])
* matrix_from_quat(quat_inv(right_hand_roll_link_feedback[:, 3:]))
) )
left_hand_roll_link_rot_error = axis_angle_from_quat( )
quat_from_matrix(
matrix_from_quat(left_hand_roll_link_setpoint[:, 3:]) if num_runs % pink_ik_test_config["num_steps_controller_convergence"] == 0:
* matrix_from_quat(quat_inv(left_hand_roll_link_feedback[:, 3:])) # Check if the left hand roll link is at the target position
) torch.testing.assert_close(
torch.mean(torch.abs(left_hand_roll_link_pos_error), dim=1),
torch.zeros(env.num_envs, device="cuda:0"),
rtol=0.0,
atol=pink_ik_test_config["pos_tolerance"],
) )
right_hand_roll_link_feedback = right_hand_roll_link_pose_obs # Check if the right hand roll link is at the target position
right_hand_roll_link_setpoint = ( torch.testing.assert_close(
torch.tensor(self.right_hand_roll_link_pose, device=device).unsqueeze(0).repeat(env.num_envs, 1) torch.mean(torch.abs(right_hand_roll_link_pos_error), dim=1),
torch.zeros(env.num_envs, device="cuda:0"),
rtol=0.0,
atol=pink_ik_test_config["pos_tolerance"],
) )
right_hand_roll_link_pos_error = (
right_hand_roll_link_setpoint[:, :3] - right_hand_roll_link_feedback[:, :3] # Check if the left hand roll link is at the target orientation
torch.testing.assert_close(
torch.mean(torch.abs(left_hand_roll_link_rot_error), dim=1),
torch.zeros(env.num_envs, device="cuda:0"),
rtol=0.0,
atol=pink_ik_test_config["rot_tolerance"],
) )
right_hand_roll_link_rot_error = axis_angle_from_quat(
quat_from_matrix( # Check if the right hand roll link is at the target orientation
matrix_from_quat(right_hand_roll_link_setpoint[:, 3:]) torch.testing.assert_close(
* matrix_from_quat(quat_inv(right_hand_roll_link_feedback[:, 3:])) torch.mean(torch.abs(right_hand_roll_link_rot_error), dim=1),
) torch.zeros(env.num_envs, device="cuda:0"),
rtol=0.0,
atol=pink_ik_test_config["rot_tolerance"],
) )
if num_runs % self.num_steps_controller_convergence == 0: # Change the setpoints to move the hands up and down as per the counter
# Check if the left hand roll link is at the target position test_counter += 1
torch.testing.assert_close( if move_hands_up and test_counter > pink_ik_test_config["num_times_to_move_hands_up"]:
torch.mean(torch.abs(left_hand_roll_link_pos_error), dim=1), move_hands_up = False
torch.zeros(env.num_envs, device="cuda:0"), elif not move_hands_up and test_counter > (
rtol=0.0, pink_ik_test_config["num_times_to_move_hands_down"]
atol=self.pos_tolerance, + pink_ik_test_config["num_times_to_move_hands_up"]
) ):
# Test is done after moving the hands up and down
# Check if the right hand roll link is at the target position break
torch.testing.assert_close( if move_hands_up:
torch.mean(torch.abs(right_hand_roll_link_pos_error), dim=1), left_hand_roll_link_pose[1] += 0.05
torch.zeros(env.num_envs, device="cuda:0"), left_hand_roll_link_pose[2] += 0.05
rtol=0.0, right_hand_roll_link_pose[1] += 0.05
atol=self.pos_tolerance, right_hand_roll_link_pose[2] += 0.05
) else:
left_hand_roll_link_pose[1] -= 0.05
# Check if the left hand roll link is at the target orientation left_hand_roll_link_pose[2] -= 0.05
torch.testing.assert_close( right_hand_roll_link_pose[1] -= 0.05
torch.mean(torch.abs(left_hand_roll_link_rot_error), dim=1), right_hand_roll_link_pose[2] -= 0.05
torch.zeros(env.num_envs, device="cuda:0"),
rtol=0.0, env.close()
atol=self.rot_tolerance,
)
# Check if the right hand roll link is at the target orientation
torch.testing.assert_close(
torch.mean(torch.abs(right_hand_roll_link_rot_error), dim=1),
torch.zeros(env.num_envs, device="cuda:0"),
rtol=0.0,
atol=self.rot_tolerance,
)
# Change the setpoints to move the hands up and down as per the counter
test_counter += 1
if move_hands_up and test_counter > self.num_times_to_move_hands_up:
move_hands_up = False
elif not move_hands_up and test_counter > (
self.num_times_to_move_hands_down + self.num_times_to_move_hands_up
):
# Test is done after moving the hands up and down
break
if move_hands_up:
self.left_hand_roll_link_pose[1] += 0.05
self.left_hand_roll_link_pose[2] += 0.05
self.right_hand_roll_link_pose[1] += 0.05
self.right_hand_roll_link_pose[2] += 0.05
else:
self.left_hand_roll_link_pose[1] -= 0.05
self.left_hand_roll_link_pose[2] -= 0.05
self.right_hand_roll_link_pose[1] -= 0.05
self.right_hand_roll_link_pose[2] -= 0.05
env.close()
[package] [package]
# Semantic Versioning is used: https://semver.org/ # Semantic Versioning is used: https://semver.org/
version = "1.0.10" version = "1.0.11"
# Description # Description
category = "isaaclab" category = "isaaclab"
......
Changelog Changelog
--------- ---------
1.0.11 (2025-07-17)
~~~~~~~~~~~~~~~~~~
Changed
^^^^^^^
* Updated test_selection_strategy.py and test_generate_dataset.py test cases to pytest format.
* Updated annotate_demos.py script to return the number of successful task completions as the exit code to support check in test_generate_dataset.py test case.
1.0.10 (2025-07-08) 1.0.10 (2025-07-08)
~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~
......
...@@ -13,110 +13,129 @@ simulation_app = AppLauncher(headless=True).app ...@@ -13,110 +13,129 @@ simulation_app = AppLauncher(headless=True).app
import os import os
import subprocess import subprocess
import tempfile import tempfile
import unittest
import pytest
from isaaclab.utils.assets import ISAACLAB_NUCLEUS_DIR, retrieve_file_path from isaaclab.utils.assets import ISAACLAB_NUCLEUS_DIR, retrieve_file_path
DATASETS_DOWNLOAD_DIR = tempfile.mkdtemp(suffix="_Isaac-Stack-Cube-Franka-IK-Rel-Mimic-v0") DATASETS_DOWNLOAD_DIR = tempfile.mkdtemp(suffix="_Isaac-Stack-Cube-Franka-IK-Rel-Mimic-v0")
NUCLEUS_DATASET_PATH = os.path.join(ISAACLAB_NUCLEUS_DIR, "Tests", "Mimic", "dataset.hdf5") NUCLEUS_DATASET_PATH = os.path.join(ISAACLAB_NUCLEUS_DIR, "Tests", "Mimic", "dataset.hdf5")
EXPECTED_SUCCESSFUL_ANNOTATIONS = 10
class TestGenerateDataset(unittest.TestCase):
"""Test the dataset generation behavior of the Isaac Lab Mimic workflow.""" @pytest.fixture
def setup_test_environment():
def setUp(self): """Set up the environment for testing."""
"""Set up the environment for testing.""" # Create the datasets directory if it does not exist
# Create the datasets directory if it does not exist if not os.path.exists(DATASETS_DOWNLOAD_DIR):
if not os.path.exists(DATASETS_DOWNLOAD_DIR): print("Creating directory : ", DATASETS_DOWNLOAD_DIR)
print("Creating directory : ", DATASETS_DOWNLOAD_DIR) os.makedirs(DATASETS_DOWNLOAD_DIR)
os.makedirs(DATASETS_DOWNLOAD_DIR)
# Try to download the dataset from Nucleus # Try to download the dataset from Nucleus
try: try:
retrieve_file_path(NUCLEUS_DATASET_PATH, DATASETS_DOWNLOAD_DIR) retrieve_file_path(NUCLEUS_DATASET_PATH, DATASETS_DOWNLOAD_DIR)
except Exception as e: except Exception as e:
print(e) print(e)
print("Could not download dataset from Nucleus") print("Could not download dataset from Nucleus")
self.fail( pytest.fail(
"The dataset required for this test is currently unavailable. Dataset path: " + NUCLEUS_DATASET_PATH "The dataset required for this test is currently unavailable. Dataset path: " + NUCLEUS_DATASET_PATH
) )
# Set the environment variable PYTHONUNBUFFERED to 1 to get all text outputs in result.stdout # Set the environment variable PYTHONUNBUFFERED to 1 to get all text outputs in result.stdout
self.pythonunbuffered_env_var_ = os.environ.get("PYTHONUNBUFFERED") pythonunbuffered_env_var_ = os.environ.get("PYTHONUNBUFFERED")
os.environ["PYTHONUNBUFFERED"] = "1" os.environ["PYTHONUNBUFFERED"] = "1"
# Automatically detect the workflow root (backtrack from current file location) # Automatically detect the workflow root (backtrack from current file location)
current_dir = os.path.dirname(os.path.abspath(__file__)) current_dir = os.path.dirname(os.path.abspath(__file__))
workflow_root = os.path.abspath(os.path.join(current_dir, "../../..")) workflow_root = os.path.abspath(os.path.join(current_dir, "../../.."))
# Run the command to generate core configs # Run the command to generate core configs
config_command = [ config_command = [
workflow_root + "/isaaclab.sh", workflow_root + "/isaaclab.sh",
"-p", "-p",
os.path.join(workflow_root, "scripts/imitation_learning/isaaclab_mimic/annotate_demos.py"), os.path.join(workflow_root, "scripts/imitation_learning/isaaclab_mimic/annotate_demos.py"),
"--task", "--task",
"Isaac-Stack-Cube-Franka-IK-Rel-Mimic-v0", "Isaac-Stack-Cube-Franka-IK-Rel-Mimic-v0",
"--input_file", "--input_file",
DATASETS_DOWNLOAD_DIR + "/dataset.hdf5", DATASETS_DOWNLOAD_DIR + "/dataset.hdf5",
"--output_file", "--output_file",
DATASETS_DOWNLOAD_DIR + "/annotated_dataset.hdf5", DATASETS_DOWNLOAD_DIR + "/annotated_dataset.hdf5",
"--auto", "--auto",
"--headless", "--headless",
] ]
print(config_command) print(config_command)
# Execute the command and capture the result # Execute the command and capture the result
result = subprocess.run(config_command, capture_output=True, text=True) result = subprocess.run(config_command, capture_output=True, text=True)
# Print the result for debugging purposes print(f"Annotate demos result: {result.returncode}\n\n\n\n\n\n\n\n\n\n\n\n")
print("Config generation result:")
print(result.stdout) # Print standard output from the command # Print the result for debugging purposes
print(result.stderr) # Print standard error from the command print("Config generation result:")
print(result.stdout) # Print standard output from the command
# Check if the config generation was successful print(result.stderr) # Print standard error from the command
self.assertEqual(result.returncode, 0, msg=result.stderr)
# Check if the config generation was successful
def tearDown(self): assert result.returncode == 0, result.stderr
"""Clean up after tests."""
if self.pythonunbuffered_env_var_: # Check that at least one task was completed successfully by parsing stdout
os.environ["PYTHONUNBUFFERED"] = self.pythonunbuffered_env_var_ # Look for the line that reports successful task completions
else: success_line = None
del os.environ["PYTHONUNBUFFERED"] for line in result.stdout.split("\n"):
if "Successful task completions:" in line:
def test_generate_dataset(self): success_line = line
"""Test the dataset generation script.""" break
# Automatically detect the workflow root (backtrack from current file location)
current_dir = os.path.dirname(os.path.abspath(__file__)) assert success_line is not None, "Could not find 'Successful task completions:' in output"
workflow_root = os.path.abspath(os.path.join(current_dir, "../../.."))
# Extract the number from the line
# Define the command to run the dataset generation script try:
command = [ successful_count = int(success_line.split(":")[-1].strip())
workflow_root + "/isaaclab.sh", assert (
"-p", successful_count == EXPECTED_SUCCESSFUL_ANNOTATIONS
os.path.join(workflow_root, "scripts/imitation_learning/isaaclab_mimic/generate_dataset.py"), ), f"Expected 10 successful annotations but got {successful_count}"
"--input_file", except (ValueError, IndexError) as e:
DATASETS_DOWNLOAD_DIR + "/annotated_dataset.hdf5", pytest.fail(f"Could not parse successful task count from line: '{success_line}'. Error: {e}")
"--output_file",
DATASETS_DOWNLOAD_DIR + "/generated_dataset.hdf5", # Yield the workflow root for use in tests
"--generation_num_trials", yield workflow_root
"1",
"--headless", # Cleanup: restore the original environment variable
] if pythonunbuffered_env_var_:
os.environ["PYTHONUNBUFFERED"] = pythonunbuffered_env_var_
# Call the script and capture output else:
result = subprocess.run(command, capture_output=True, text=True) del os.environ["PYTHONUNBUFFERED"]
# Print the result for debugging purposes
print("Dataset generation result:") def test_generate_dataset(setup_test_environment):
print(result.stdout) # Print standard output from the command """Test the dataset generation script."""
print(result.stderr) # Print standard error from the command workflow_root = setup_test_environment
# Check if the script executed successfully # Define the command to run the dataset generation script
self.assertEqual(result.returncode, 0, msg=result.stderr) command = [
workflow_root + "/isaaclab.sh",
# Check for specific output "-p",
expected_output = "successes/attempts. Exiting" os.path.join(workflow_root, "scripts/imitation_learning/isaaclab_mimic/generate_dataset.py"),
self.assertIn(expected_output, result.stdout) "--input_file",
DATASETS_DOWNLOAD_DIR + "/annotated_dataset.hdf5",
"--output_file",
if __name__ == "__main__": DATASETS_DOWNLOAD_DIR + "/generated_dataset.hdf5",
unittest.main() "--generation_num_trials",
"1",
"--headless",
]
# Call the script and capture output
result = subprocess.run(command, capture_output=True, text=True)
# Print the result for debugging purposes
print("Dataset generation result:")
print(result.stdout) # Print standard output from the command
print(result.stderr) # Print standard error from the command
# Check if the script executed successfully
assert result.returncode == 0, result.stderr
# Check for specific output
expected_output = "successes/attempts. Exiting"
assert expected_output in result.stdout
...@@ -10,7 +10,8 @@ simulation_app = AppLauncher(headless=True).app ...@@ -10,7 +10,8 @@ simulation_app = AppLauncher(headless=True).app
import numpy as np import numpy as np
import torch import torch
import unittest
import pytest
import isaaclab.utils.math as PoseUtils import isaaclab.utils.math as PoseUtils
...@@ -26,272 +27,251 @@ from isaaclab_mimic.datagen.selection_strategy import ( ...@@ -26,272 +27,251 @@ from isaaclab_mimic.datagen.selection_strategy import (
NUM_ITERS = 1000 NUM_ITERS = 1000
class TestNearestNeighborObjectStrategy(unittest.TestCase): @pytest.fixture
"""Test the NearestNeighborObjectStrategy class.""" def nearest_neighbor_object_strategy():
"""Fixture for NearestNeighborObjectStrategy."""
def setUp(self): return NearestNeighborObjectStrategy()
"""Set up test cases for the NearestNeighborObjectStrategy."""
# Initialize the strategy object for selecting nearest neighbors
self.strategy = NearestNeighborObjectStrategy() @pytest.fixture
def nearest_neighbor_robot_distance_strategy():
def test_select_source_demo_identity_orientations(self): """Fixture for NearestNeighborRobotDistanceStrategy."""
"""Test the selection of source demonstrations using two distinct object_pose clusters. return NearestNeighborRobotDistanceStrategy()
This method generates two clusters of object poses and randomly adjusts the current object pose within
specified deviations. It then simulates multiple selections to verify that when the current pose is close def test_select_source_demo_identity_orientations_object_strategy(nearest_neighbor_object_strategy):
to cluster 1, all selected indices correspond to that cluster, and that the same holds true for cluster 2. """Test the selection of source demonstrations using two distinct object_pose clusters.
"""
This method generates two clusters of object poses and randomly adjusts the current object pose within
# Define ranges for two clusters of object poses specified deviations. It then simulates multiple selections to verify that when the current pose is close
cluster_1_range_min = 0 to cluster 1, all selected indices correspond to that cluster, and that the same holds true for cluster 2.
cluster_1_range_max = 4 """
cluster_2_range_min = 25
cluster_2_range_max = 35 # Define ranges for two clusters of object poses
cluster_1_range_min = 0
# Generate object poses for cluster 1 with varying translations cluster_1_range_max = 4
src_object_poses_in_world_cluster_1 = [ cluster_2_range_min = 25
torch.eye(4) cluster_2_range_max = 35
+ torch.tensor([[0.0, 0.0, 0.0, i], [0.0, 0.0, 0.0, i], [0.0, 0.0, 0.0, i], [0.0, 0.0, 0.0, -1.0]])
for i in range(cluster_1_range_min, cluster_1_range_max) # Generate object poses for cluster 1 with varying translations
] src_object_poses_in_world_cluster_1 = [
torch.eye(4) + torch.tensor([[0.0, 0.0, 0.0, i], [0.0, 0.0, 0.0, i], [0.0, 0.0, 0.0, i], [0.0, 0.0, 0.0, -1.0]])
# Generate object poses for cluster 2 similarly for i in range(cluster_1_range_min, cluster_1_range_max)
src_object_poses_in_world_cluster_2 = [ ]
torch.eye(4)
+ torch.tensor([[0.0, 0.0, 0.0, i], [0.0, 0.0, 0.0, i], [0.0, 0.0, 0.0, i], [0.0, 0.0, 0.0, -1.0]]) # Generate object poses for cluster 2 similarly
for i in range(cluster_2_range_min, cluster_2_range_max) src_object_poses_in_world_cluster_2 = [
] torch.eye(4) + torch.tensor([[0.0, 0.0, 0.0, i], [0.0, 0.0, 0.0, i], [0.0, 0.0, 0.0, i], [0.0, 0.0, 0.0, -1.0]])
for i in range(cluster_2_range_min, cluster_2_range_max)
# Combine the poses from both clusters into a single list ]
src_object_poses_in_world = src_object_poses_in_world_cluster_1 + src_object_poses_in_world_cluster_2
# Combine the poses from both clusters into a single list
# Create DatagenInfo instances for these positions src_object_poses_in_world = src_object_poses_in_world_cluster_1 + src_object_poses_in_world_cluster_2
src_subtask_datagen_infos = [
DatagenInfo(object_poses={0: object_pose.unsqueeze(0)}) for object_pose in src_object_poses_in_world # Create DatagenInfo instances for these positions
] src_subtask_datagen_infos = [
DatagenInfo(object_poses={0: object_pose.unsqueeze(0)}) for object_pose in src_object_poses_in_world
# Define the end-effector pose (not used in the nearest neighbor selection) ]
eef_pose = torch.eye(4)
# Define the end-effector pose (not used in the nearest neighbor selection)
# Test 1: eef_pose = torch.eye(4)
# Set the current object pose to the first value of cluster 1 and add some noise
# Check that the nearest neighbor is always part of cluster 1 # Test 1:
max_deviation = 3 # Define a maximum deviation for the current pose # Set the current object pose to the first value of cluster 1 and add some noise
# Randomly select an index from cluster 1 # Check that the nearest neighbor is always part of cluster 1
random_index_cluster_1 = np.random.randint(0, len(src_object_poses_in_world_cluster_1)) max_deviation = 3 # Define a maximum deviation for the current pose
cluster_1_curr_object_pose = src_object_poses_in_world_cluster_1[ # Randomly select an index from cluster 1
random_index_cluster_1 random_index_cluster_1 = np.random.randint(0, len(src_object_poses_in_world_cluster_1))
].clone() # Use clone to avoid reference issues cluster_1_curr_object_pose = src_object_poses_in_world_cluster_1[
# Randomly adjust the current pose within the maximum deviation random_index_cluster_1
cluster_1_curr_object_pose[0, 3] += torch.rand(1).item() * max_deviation ].clone() # Use clone to avoid reference issues
cluster_1_curr_object_pose[1, 3] += torch.rand(1).item() * max_deviation # Randomly adjust the current pose within the maximum deviation
cluster_1_curr_object_pose[2, 3] += torch.rand(1).item() * max_deviation cluster_1_curr_object_pose[0, 3] += torch.rand(1).item() * max_deviation
cluster_1_curr_object_pose[1, 3] += torch.rand(1).item() * max_deviation
# Select source demonstrations multiple times to check randomness cluster_1_curr_object_pose[2, 3] += torch.rand(1).item() * max_deviation
selected_indices = [
self.strategy.select_source_demo( # Select source demonstrations multiple times to check randomness
eef_pose, selected_indices = [
cluster_1_curr_object_pose, nearest_neighbor_object_strategy.select_source_demo(
src_subtask_datagen_infos, eef_pose,
pos_weight=1.0, cluster_1_curr_object_pose,
rot_weight=1.0, src_subtask_datagen_infos,
nn_k=3, # Check among the top 3 nearest neighbors pos_weight=1.0,
) rot_weight=1.0,
for _ in range(NUM_ITERS) nn_k=3, # Check among the top 3 nearest neighbors
]
# Assert that all selected indices are valid indices within cluster 1
self.assertTrue(
np.all(np.array(selected_indices) < len(src_object_poses_in_world_cluster_1)),
"Some selected indices are not part of cluster 1.",
)
# Test 2:
# Set the current object pose to the first value of cluster 2 and add some noise
# Check that the nearest neighbor is always part of cluster 2
max_deviation = 5 # Define a maximum deviation for the current pose in cluster 2
# Randomly select an index from cluster 2
random_index_cluster_2 = np.random.randint(0, len(src_object_poses_in_world_cluster_2))
cluster_2_curr_object_pose = src_object_poses_in_world_cluster_2[
random_index_cluster_2
].clone() # Use clone to avoid reference issues
# Randomly adjust the current pose within the maximum deviation
cluster_2_curr_object_pose[0, 3] += torch.rand(1).item() * max_deviation
cluster_2_curr_object_pose[1, 3] += torch.rand(1).item() * max_deviation
cluster_2_curr_object_pose[2, 3] += torch.rand(1).item() * max_deviation
# Select source demonstrations multiple times to check randomness
selected_indices = [
self.strategy.select_source_demo(
eef_pose,
cluster_2_curr_object_pose,
src_subtask_datagen_infos,
pos_weight=1.0,
rot_weight=1.0,
nn_k=6, # Check among the top 6 nearest neighbors
)
for _ in range(20)
]
# Assert that all selected indices are valid indices within cluster 2
self.assertTrue(
np.all(np.array(selected_indices) < len(src_object_poses_in_world)),
"Some selected indices are not part of cluster 2.",
)
self.assertTrue(
np.all(np.array(selected_indices) > (len(src_object_poses_in_world_cluster_1) - 1)),
"Some selected indices are not part of cluster 2.",
)
class TestNearestNeighborRobotDistanceStrategy(unittest.TestCase):
"""Test the NearestNeighborRobotDistanceStrategy class."""
def setUp(self):
"""Set up test cases for the NearestNeighborRobotDistanceStrategy."""
# Initialize the strategy object for selecting nearest neighbors
self.strategy = NearestNeighborRobotDistanceStrategy()
def test_select_source_demo_identity_orientations(self):
"""Test the selection of source demonstrations based on identity-oriented poses with varying positions.
This method generates two clusters of object poses and randomly adjusts the current object pose within
specified deviations. It then simulates multiple selections to verify that when the current pose is close
to cluster 1, all selected indices correspond to that cluster, and that the same holds true for cluster 2.
"""
# Define ranges for two clusters of object poses
cluster_1_range_min = 0
cluster_1_range_max = 4
cluster_2_range_min = 25
cluster_2_range_max = 35
# Generate random transformed object poses for cluster 1 with varying translations
# This represents the first object pose for the transformed subtask segment for each source demo
transformed_eef_pose_cluster_1 = [
torch.eye(4) + torch.tensor([[0, 0, 0, i], [0, 0, 0, i], [0, 0, 0, i], [0, 0, 0, -1]])
for i in range(cluster_1_range_min, cluster_1_range_max)
]
# Generate object poses for cluster 2 similarly
transformed_eef_pose_cluster_2 = [
torch.eye(4) + torch.tensor([[0, 0, 0, i], [0, 0, 0, i], [0, 0, 0, i], [0, 0, 0, -1]])
for i in range(cluster_2_range_min, cluster_2_range_max)
]
# Combine the poses from both clusters into a single list
# This represents the first end effector pose for the transformed subtask segment for each source demo
transformed_eef_in_world_poses_tensor = torch.stack(
transformed_eef_pose_cluster_1 + transformed_eef_pose_cluster_2
) )
for _ in range(NUM_ITERS)
# Create transformation matrices corresponding to each source object pose ]
src_obj_in_world_poses = torch.stack([
PoseUtils.generate_random_transformation_matrix(pos_boundary=10, rot_boundary=(2 * np.pi)) # Assert that all selected indices are valid indices within cluster 1
for _ in range(transformed_eef_in_world_poses_tensor.shape[0]) assert np.all(
]) np.array(selected_indices) < len(src_object_poses_in_world_cluster_1)
), "Some selected indices are not part of cluster 1."
# Calculate the src_eef poses from the transformed eef poses, src_obj_in_world and curr_obj_pose_in_world
# This is the inverse of the transformation of the eef pose done in NearestNeighborRobotDistanceStrategy # Test 2:
# Refer to NearestNeighborRobotDistanceStrategy.select_source_demo for more details # Set the current object pose to the first value of cluster 2 and add some noise
curr_object_in_world_pose = PoseUtils.generate_random_transformation_matrix( # Check that the nearest neighbor is always part of cluster 2
pos_boundary=10, rot_boundary=(2 * np.pi) max_deviation = 5 # Define a maximum deviation for the current pose in cluster 2
# Randomly select an index from cluster 2
random_index_cluster_2 = np.random.randint(0, len(src_object_poses_in_world_cluster_2))
cluster_2_curr_object_pose = src_object_poses_in_world_cluster_2[
random_index_cluster_2
].clone() # Use clone to avoid reference issues
# Randomly adjust the current pose within the maximum deviation
cluster_2_curr_object_pose[0, 3] += torch.rand(1).item() * max_deviation
cluster_2_curr_object_pose[1, 3] += torch.rand(1).item() * max_deviation
cluster_2_curr_object_pose[2, 3] += torch.rand(1).item() * max_deviation
# Select source demonstrations multiple times to check randomness
selected_indices = [
nearest_neighbor_object_strategy.select_source_demo(
eef_pose,
cluster_2_curr_object_pose,
src_subtask_datagen_infos,
pos_weight=1.0,
rot_weight=1.0,
nn_k=6, # Check among the top 6 nearest neighbors
) )
world_in_curr_obj_pose = PoseUtils.pose_inv(curr_object_in_world_pose) for _ in range(20)
]
src_eef_in_src_obj_poses = PoseUtils.pose_in_A_to_pose_in_B(
pose_in_A=transformed_eef_in_world_poses_tensor, # Assert that all selected indices are valid indices within cluster 2
pose_A_in_B=world_in_curr_obj_pose, assert np.all(
np.array(selected_indices) < len(src_object_poses_in_world)
), "Some selected indices are not part of cluster 2."
assert np.all(
np.array(selected_indices) > (len(src_object_poses_in_world_cluster_1) - 1)
), "Some selected indices are not part of cluster 2."
def test_select_source_demo_identity_orientations_robot_distance_strategy(nearest_neighbor_robot_distance_strategy):
"""Test the selection of source demonstrations based on identity-oriented poses with varying positions.
This method generates two clusters of object poses and randomly adjusts the current object pose within
specified deviations. It then simulates multiple selections to verify that when the current pose is close
to cluster 1, all selected indices correspond to that cluster, and that the same holds true for cluster 2.
"""
# Define ranges for two clusters of object poses
cluster_1_range_min = 0
cluster_1_range_max = 4
cluster_2_range_min = 25
cluster_2_range_max = 35
# Generate random transformed object poses for cluster 1 with varying translations
# This represents the first object pose for the transformed subtask segment for each source demo
transformed_eef_pose_cluster_1 = [
torch.eye(4) + torch.tensor([[0, 0, 0, i], [0, 0, 0, i], [0, 0, 0, i], [0, 0, 0, -1]])
for i in range(cluster_1_range_min, cluster_1_range_max)
]
# Generate object poses for cluster 2 similarly
transformed_eef_pose_cluster_2 = [
torch.eye(4) + torch.tensor([[0, 0, 0, i], [0, 0, 0, i], [0, 0, 0, i], [0, 0, 0, -1]])
for i in range(cluster_2_range_min, cluster_2_range_max)
]
# Combine the poses from both clusters into a single list
# This represents the first end effector pose for the transformed subtask segment for each source demo
transformed_eef_in_world_poses_tensor = torch.stack(transformed_eef_pose_cluster_1 + transformed_eef_pose_cluster_2)
# Create transformation matrices corresponding to each source object pose
src_obj_in_world_poses = torch.stack([
PoseUtils.generate_random_transformation_matrix(pos_boundary=10, rot_boundary=(2 * np.pi))
for _ in range(transformed_eef_in_world_poses_tensor.shape[0])
])
# Calculate the src_eef poses from the transformed eef poses, src_obj_in_world and curr_obj_pose_in_world
# This is the inverse of the transformation of the eef pose done in NearestNeighborRobotDistanceStrategy
# Refer to NearestNeighborRobotDistanceStrategy.select_source_demo for more details
curr_object_in_world_pose = PoseUtils.generate_random_transformation_matrix(
pos_boundary=10, rot_boundary=(2 * np.pi)
)
world_in_curr_obj_pose = PoseUtils.pose_inv(curr_object_in_world_pose)
src_eef_in_src_obj_poses = PoseUtils.pose_in_A_to_pose_in_B(
pose_in_A=transformed_eef_in_world_poses_tensor,
pose_A_in_B=world_in_curr_obj_pose,
)
src_eef_in_world_poses = PoseUtils.pose_in_A_to_pose_in_B(
pose_in_A=src_eef_in_src_obj_poses,
pose_A_in_B=src_obj_in_world_poses,
)
# Check that both lists have the same length
assert src_obj_in_world_poses.shape[0] == src_eef_in_world_poses.shape[0], (
"Source object poses and end effector poses does not have the same length. "
"This is a bug in the test code and not the source code."
)
# Create DatagenInfo instances for these positions
src_subtask_datagen_infos = [
DatagenInfo(eef_pose=src_eef_in_world_pose.unsqueeze(0), object_poses={0: src_obj_in_world_pose.unsqueeze(0)})
for src_obj_in_world_pose, src_eef_in_world_pose in zip(src_obj_in_world_poses, src_eef_in_world_poses)
]
# Test 1: Ensure the nearest neighbor is always part of cluster 1
max_deviation = 3 # Define a maximum deviation for the current pose
# Define the end-effector pose
# Set the current object pose to the first value of cluster 1 and add some noise
random_index_cluster_1 = np.random.randint(0, len(transformed_eef_pose_cluster_1))
curr_eef_in_world_pose = transformed_eef_pose_cluster_1[
random_index_cluster_1
].clone() # Use clone to avoid reference issues
# Randomly adjust the current pose within the maximum deviation
curr_eef_in_world_pose[0, 3] += torch.rand(1).item() * max_deviation
curr_eef_in_world_pose[1, 3] += torch.rand(1).item() * max_deviation
curr_eef_in_world_pose[2, 3] += torch.rand(1).item() * max_deviation
# Select source demonstrations multiple times to check randomness
selected_indices = [
nearest_neighbor_robot_distance_strategy.select_source_demo(
curr_eef_in_world_pose,
curr_object_in_world_pose,
src_subtask_datagen_infos,
pos_weight=1.0,
rot_weight=1.0,
nn_k=3, # Check among the top 3 nearest neighbors
) )
for _ in range(20)
src_eef_in_world_poses = PoseUtils.pose_in_A_to_pose_in_B( ]
pose_in_A=src_eef_in_src_obj_poses,
pose_A_in_B=src_obj_in_world_poses, # Assert that all selected indices are valid indices within cluster 1
assert np.all(
np.array(selected_indices) < len(transformed_eef_pose_cluster_1)
), "Some selected indices are not part of cluster 1."
# Test 2: Ensure the nearest neighbor is always part of cluster 2
max_deviation = 3 # Define a maximum deviation for the current pose
# Define the end-effector pose
# Set the current object pose to the first value of cluster 2 and add some noise
random_index_cluster_2 = np.random.randint(0, len(transformed_eef_pose_cluster_2))
curr_eef_in_world_pose = transformed_eef_pose_cluster_2[
random_index_cluster_2
].clone() # Use clone to avoid reference issues
# Randomly adjust the current pose within the maximum deviation
curr_eef_in_world_pose[0, 3] += torch.rand(1).item() * max_deviation
curr_eef_in_world_pose[1, 3] += torch.rand(1).item() * max_deviation
curr_eef_in_world_pose[2, 3] += torch.rand(1).item() * max_deviation
# Select source demonstrations multiple times to check randomness
selected_indices = [
nearest_neighbor_robot_distance_strategy.select_source_demo(
curr_eef_in_world_pose,
curr_object_in_world_pose,
src_subtask_datagen_infos,
pos_weight=1.0,
rot_weight=1.0,
nn_k=3, # Check among the top 3 nearest neighbors
) )
for _ in range(20)
# Check that both lists have the same length ]
self.assertTrue(
src_obj_in_world_poses.shape[0] == src_eef_in_world_poses.shape[0], # Assert that all selected indices are valid indices within cluster 2
"Source object poses and end effector poses does not have the same length." assert np.all(
"This is a bug in the test code and not the source code.", np.array(selected_indices) < transformed_eef_in_world_poses_tensor.shape[0]
) ), "Some selected indices are not part of cluster 2."
assert np.all(
# Create DatagenInfo instances for these positions np.array(selected_indices) > (len(transformed_eef_pose_cluster_1) - 1)
src_subtask_datagen_infos = [ ), "Some selected indices are not part of cluster 2."
DatagenInfo(
eef_pose=src_eef_in_world_pose.unsqueeze(0), object_poses={0: src_obj_in_world_pose.unsqueeze(0)}
)
for src_obj_in_world_pose, src_eef_in_world_pose in zip(src_obj_in_world_poses, src_eef_in_world_poses)
]
# Test 1: Ensure the nearest neighbor is always part of cluster 1
max_deviation = 3 # Define a maximum deviation for the current pose
# Define the end-effector pose
# Set the current object pose to the first value of cluster 1 and add some noise
random_index_cluster_1 = np.random.randint(0, len(transformed_eef_pose_cluster_1))
curr_eef_in_world_pose = transformed_eef_pose_cluster_1[
random_index_cluster_1
].clone() # Use clone to avoid reference issues
# Randomly adjust the current pose within the maximum deviation
curr_eef_in_world_pose[0, 3] += torch.rand(1).item() * max_deviation
curr_eef_in_world_pose[1, 3] += torch.rand(1).item() * max_deviation
curr_eef_in_world_pose[2, 3] += torch.rand(1).item() * max_deviation
# Select source demonstrations multiple times to check randomness
selected_indices = [
self.strategy.select_source_demo(
curr_eef_in_world_pose,
curr_object_in_world_pose,
src_subtask_datagen_infos,
pos_weight=1.0,
rot_weight=1.0,
nn_k=3, # Check among the top 3 nearest neighbors
)
for _ in range(20)
]
# Assert that all selected indices are valid indices within cluster 1
self.assertTrue(
np.all(np.array(selected_indices) < len(transformed_eef_pose_cluster_1)),
"Some selected indices are not part of cluster 1.",
)
# Test 2: Ensure the nearest neighbor is always part of cluster 2
max_deviation = 3 # Define a maximum deviation for the current pose
# Define the end-effector pose
# Set the current object pose to the first value of cluster 2 and add some noise
random_index_cluster_2 = np.random.randint(0, len(transformed_eef_pose_cluster_2))
curr_eef_in_world_pose = transformed_eef_pose_cluster_2[
random_index_cluster_2
].clone() # Use clone to avoid reference issues
# Randomly adjust the current pose within the maximum deviation
curr_eef_in_world_pose[0, 3] += torch.rand(1).item() * max_deviation
curr_eef_in_world_pose[1, 3] += torch.rand(1).item() * max_deviation
curr_eef_in_world_pose[2, 3] += torch.rand(1).item() * max_deviation
# Select source demonstrations multiple times to check randomness
selected_indices = [
self.strategy.select_source_demo(
curr_eef_in_world_pose,
curr_object_in_world_pose,
src_subtask_datagen_infos,
pos_weight=1.0,
rot_weight=1.0,
nn_k=3, # Check among the top 3 nearest neighbors
)
for _ in range(20)
]
# Assert that all selected indices are valid indices within cluster 2
self.assertTrue(
np.all(np.array(selected_indices) < transformed_eef_in_world_poses_tensor.shape[0]),
"Some selected indices are not part of cluster 2.",
)
self.assertTrue(
np.all(np.array(selected_indices) > (len(transformed_eef_pose_cluster_1) - 1)),
"Some selected indices are not part of cluster 2.",
)
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment