Unverified Commit a4d3e0c4 authored by Kelly Guo's avatar Kelly Guo Committed by GitHub

Switches unittest to pytest for testing (#2459)

# Description

Switches our unit testing framework from unittest to pytest to enable
better CI logging and improve developer experience when checking for
test failures.

## Type of change

- Bug fix (non-breaking change which fixes an issue)
- New feature (non-breaking change which adds functionality)
- Breaking change (fix or feature that would cause existing
functionality to not work as expected)

## Checklist

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./isaaclab.sh --format`
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [ ] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there

<!--
As you go through the checklist above, you can mark something as done by
putting an x character in it

For example,
- [x] I have done this task
- [ ] I have not done this task
-->

---------
Co-authored-by: 's avatarPascal Roth <roth.pascal@outlook.de>
Co-authored-by: 's avatarPascal Roth <57946385+pascal-roth@users.noreply.github.com>
parent 93fd2120
This diff is collapsed.
...@@ -63,3 +63,6 @@ _build ...@@ -63,3 +63,6 @@ _build
# Teleop Recorded Dataset # Teleop Recorded Dataset
datasets datasets
# Tests
tests/
...@@ -33,6 +33,25 @@ ...@@ -33,6 +33,25 @@
"args" : ["--task", "Isaac-Reach-Franka-v0", "--num_envs", "32"], "args" : ["--task", "Isaac-Reach-Franka-v0", "--num_envs", "32"],
"program": "${workspaceFolder}/scripts/reinforcement_learning/rsl_rl/play.py", "program": "${workspaceFolder}/scripts/reinforcement_learning/rsl_rl/play.py",
"console": "integratedTerminal" "console": "integratedTerminal"
},
{
"name": "Python: SinglePytest",
"type": "python",
"request": "launch",
"module": "pytest",
"args": [
"${file}"
],
"console": "integratedTerminal"
},
{
"name": "Python: ALL Pytest",
"type": "python",
"request": "launch",
"module": "pytest",
"args": ["source/isaaclab/test"],
"console": "integratedTerminal",
"justMyCode": false
} }
] ]
} }
...@@ -511,7 +511,7 @@ if "%arg%"=="-i" ( ...@@ -511,7 +511,7 @@ if "%arg%"=="-i" (
set "skip=1" set "skip=1"
) )
) )
!python_exe! tools\run_all_tests.py !allArgs! !python_exe! -m pytest tools !allArgs!
goto :end goto :end
) else if "%arg%"=="--test" ( ) else if "%arg%"=="--test" (
rem run the python provided by Isaac Sim rem run the python provided by Isaac Sim
...@@ -525,7 +525,7 @@ if "%arg%"=="-i" ( ...@@ -525,7 +525,7 @@ if "%arg%"=="-i" (
set "skip=1" set "skip=1"
) )
) )
!python_exe! tools\run_all_tests.py !allArgs! !python_exe! -m pytest tools !allArgs!
goto :end goto :end
) else if "%arg%"=="-v" ( ) else if "%arg%"=="-v" (
rem update the vscode settings rem update the vscode settings
......
...@@ -390,7 +390,7 @@ while [[ $# -gt 0 ]]; do ...@@ -390,7 +390,7 @@ while [[ $# -gt 0 ]]; do
# run the python provided by isaacsim # run the python provided by isaacsim
python_exe=$(extract_python_exe) python_exe=$(extract_python_exe)
shift # past argument shift # past argument
${python_exe} ${ISAACLAB_PATH}/tools/run_all_tests.py $@ ${python_exe} -m pytest ${ISAACLAB_PATH}/tools $@
# exit neatly # exit neatly
break break
;; ;;
......
...@@ -13,4 +13,3 @@ These include: ...@@ -13,4 +13,3 @@ These include:
""" """
from .app_launcher import AppLauncher # noqa: F401, F403 from .app_launcher import AppLauncher # noqa: F401, F403
from .runners import run_tests # noqa: F401, F403
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Sub-module with runners to simplify running main via unittest."""
import unittest
def run_tests(verbosity: int = 2, **kwargs):
"""Wrapper for running tests via ``unittest``.
Args:
verbosity: Verbosity level for the test runner.
**kwargs: Additional arguments to pass to the `unittest.main` function.
"""
# run main
unittest.main(verbosity=verbosity, exit=True, **kwargs)
...@@ -39,6 +39,10 @@ INSTALL_REQUIRES = [ ...@@ -39,6 +39,10 @@ INSTALL_REQUIRES = [
"pillow==11.0.0", "pillow==11.0.0",
# livestream # livestream
"starlette==0.46.0", "starlette==0.46.0",
# testing
"pytest",
"pytest-mock",
"junitparser",
"flatdict==4.0.1", "flatdict==4.0.1",
] ]
......
...@@ -4,25 +4,24 @@ ...@@ -4,25 +4,24 @@
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
import argparse import argparse
import unittest
from unittest import mock
from isaaclab.app import AppLauncher, run_tests import pytest
from isaaclab.app import AppLauncher
class TestAppLauncher(unittest.TestCase):
"""Test launching of the simulation app using AppLauncher."""
@mock.patch("argparse.ArgumentParser.parse_args", return_value=argparse.Namespace(livestream=1)) @pytest.mark.usefixtures("mocker")
def test_livestream_launch_with_argparser(self, mock_args): def test_livestream_launch_with_argparser(mocker):
"""Test launching with argparser arguments.""" """Test launching with argparser arguments."""
# Mock the parse_args method
mocker.patch("argparse.ArgumentParser.parse_args", return_value=argparse.Namespace(livestream=1, headless=True))
# create argparser # create argparser
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
# add app launcher arguments # add app launcher arguments
AppLauncher.add_app_launcher_args(parser) AppLauncher.add_app_launcher_args(parser)
# check that argparser has the mandatory arguments # check that argparser has the mandatory arguments
for name in AppLauncher._APPLAUNCHER_CFG_INFO: for name in AppLauncher._APPLAUNCHER_CFG_INFO:
self.assertTrue(parser._option_string_actions[f"--{name}"]) assert parser._option_string_actions[f"--{name}"]
# parse args # parse args
mock_args = parser.parse_args() mock_args = parser.parse_args()
# everything defaults to None # everything defaults to None
...@@ -35,13 +34,9 @@ class TestAppLauncher(unittest.TestCase): ...@@ -35,13 +34,9 @@ class TestAppLauncher(unittest.TestCase):
carb_settings_iface = carb.settings.get_settings() carb_settings_iface = carb.settings.get_settings()
# check settings # check settings
# -- no-gui mode # -- no-gui mode
self.assertEqual(carb_settings_iface.get("/app/window/enabled"), False) assert carb_settings_iface.get("/app/window/enabled") is False
# -- livestream # -- livestream
self.assertEqual(carb_settings_iface.get("/app/livestream/enabled"), True) assert carb_settings_iface.get("/app/livestream/enabled") is True
# close the app on exit # close the app on exit
app.close() app.close()
if __name__ == "__main__":
run_tests()
...@@ -4,18 +4,17 @@ ...@@ -4,18 +4,17 @@
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
import os import os
import unittest
from isaaclab.app import AppLauncher, run_tests import pytest
from isaaclab.app import AppLauncher
class TestAppLauncher(unittest.TestCase):
"""Test launching of the simulation app using AppLauncher."""
def test_livestream_launch_with_env_var(self): @pytest.mark.usefixtures("mocker")
"""Test launching with no-keyword args but environment variables.""" def test_livestream_launch_with_env_vars(mocker):
# manually set the settings as well to make sure they are set correctly """Test launching with environment variables."""
os.environ["LIVESTREAM"] = "1" # Mock the environment variables
mocker.patch.dict(os.environ, {"LIVESTREAM": "1", "HEADLESS": "1"})
# everything defaults to None # everything defaults to None
app = AppLauncher().app app = AppLauncher().app
...@@ -26,13 +25,9 @@ class TestAppLauncher(unittest.TestCase): ...@@ -26,13 +25,9 @@ class TestAppLauncher(unittest.TestCase):
carb_settings_iface = carb.settings.get_settings() carb_settings_iface = carb.settings.get_settings()
# check settings # check settings
# -- no-gui mode # -- no-gui mode
self.assertEqual(carb_settings_iface.get("/app/window/enabled"), False) assert carb_settings_iface.get("/app/window/enabled") is False
# -- livestream # -- livestream
self.assertEqual(carb_settings_iface.get("/app/livestream/enabled"), True) assert carb_settings_iface.get("/app/livestream/enabled") is True
# close the app on exit # close the app on exit
app.close() app.close()
if __name__ == "__main__":
run_tests()
...@@ -3,16 +3,14 @@ ...@@ -3,16 +3,14 @@
# #
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
import unittest import pytest
from isaaclab.app import AppLauncher, run_tests from isaaclab.app import AppLauncher
class TestAppLauncher(unittest.TestCase): @pytest.mark.usefixtures("mocker")
"""Test launching of the simulation app using AppLauncher.""" def test_livestream_launch_with_kwargs(mocker):
"""Test launching with keyword arguments."""
def test_livestream_launch_with_kwarg(self):
"""Test launching with headless and livestreaming arguments."""
# everything defaults to None # everything defaults to None
app = AppLauncher(headless=True, livestream=1).app app = AppLauncher(headless=True, livestream=1).app
...@@ -23,13 +21,9 @@ class TestAppLauncher(unittest.TestCase): ...@@ -23,13 +21,9 @@ class TestAppLauncher(unittest.TestCase):
carb_settings_iface = carb.settings.get_settings() carb_settings_iface = carb.settings.get_settings()
# check settings # check settings
# -- no-gui mode # -- no-gui mode
self.assertEqual(carb_settings_iface.get("/app/window/enabled"), False) assert carb_settings_iface.get("/app/window/enabled") is False
# -- livestream # -- livestream
self.assertEqual(carb_settings_iface.get("/app/livestream/enabled"), True) assert carb_settings_iface.get("/app/livestream/enabled") is True
# close the app on exit # close the app on exit
app.close() app.close()
if __name__ == "__main__":
run_tests()
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
"""Launch Isaac Sim Simulator first.""" """Launch Isaac Sim Simulator first."""
from isaaclab.app import AppLauncher, run_tests from isaaclab.app import AppLauncher
# launch omniverse app # launch omniverse app
simulation_app = AppLauncher(headless=True).app simulation_app = AppLauncher(headless=True).app
...@@ -13,10 +13,10 @@ simulation_app = AppLauncher(headless=True).app ...@@ -13,10 +13,10 @@ simulation_app = AppLauncher(headless=True).app
"""Rest everything follows.""" """Rest everything follows."""
import torch import torch
import unittest
import isaacsim.core.utils.prims as prim_utils import isaacsim.core.utils.prims as prim_utils
import isaacsim.core.utils.stage as stage_utils import isaacsim.core.utils.stage as stage_utils
import pytest
from isaacsim.core.cloner import GridCloner from isaacsim.core.cloner import GridCloner
import isaaclab.sim as sim_utils import isaaclab.sim as sim_utils
...@@ -37,20 +37,18 @@ from isaaclab.utils.math import ( # isort:skip ...@@ -37,20 +37,18 @@ from isaaclab.utils.math import ( # isort:skip
from isaaclab_assets import FRANKA_PANDA_HIGH_PD_CFG, UR10_CFG # isort:skip from isaaclab_assets import FRANKA_PANDA_HIGH_PD_CFG, UR10_CFG # isort:skip
class TestDifferentialIKController(unittest.TestCase): @pytest.fixture
"""Test fixture for checking that differential IK controller tracks commands properly.""" def sim():
"""Create a simulation context for testing."""
def setUp(self):
"""Create a blank new stage for each test."""
# Wait for spawning # Wait for spawning
stage_utils.create_new_stage() stage_utils.create_new_stage()
# Constants # Constants
self.num_envs = 128 num_envs = 128
# Load kit helper # Load kit helper
sim_cfg = sim_utils.SimulationCfg(dt=0.01) sim_cfg = sim_utils.SimulationCfg(dt=0.01)
self.sim = sim_utils.SimulationContext(sim_cfg) sim = sim_utils.SimulationContext(sim_cfg)
# TODO: Remove this once we have a better way to handle this. # TODO: Remove this once we have a better way to handle this.
self.sim._app_control_on_stop_handle = None sim._app_control_on_stop_handle = None
# Create a ground plane # Create a ground plane
cfg = sim_utils.GroundPlaneCfg() cfg = sim_utils.GroundPlaneCfg()
...@@ -59,13 +57,13 @@ class TestDifferentialIKController(unittest.TestCase): ...@@ -59,13 +57,13 @@ class TestDifferentialIKController(unittest.TestCase):
# Create interface to clone the scene # Create interface to clone the scene
cloner = GridCloner(spacing=2.0) cloner = GridCloner(spacing=2.0)
cloner.define_base_env("/World/envs") cloner.define_base_env("/World/envs")
self.env_prim_paths = cloner.generate_paths("/World/envs/env", self.num_envs) env_prim_paths = cloner.generate_paths("/World/envs/env", num_envs)
# create source prim # create source prim
prim_utils.define_prim(self.env_prim_paths[0], "Xform") prim_utils.define_prim(env_prim_paths[0], "Xform")
# clone the env xform # clone the env xform
self.env_origins = cloner.clone( cloner.clone(
source_prim_path=self.env_prim_paths[0], source_prim_path=env_prim_paths[0],
prim_paths=self.env_prim_paths, prim_paths=env_prim_paths,
replicate_physics=True, replicate_physics=True,
) )
...@@ -75,35 +73,39 @@ class TestDifferentialIKController(unittest.TestCase): ...@@ -75,35 +73,39 @@ class TestDifferentialIKController(unittest.TestCase):
[0.5, -0.4, 0.6, 0.707, 0.707, 0.0, 0.0], [0.5, -0.4, 0.6, 0.707, 0.707, 0.0, 0.0],
[0.5, 0, 0.5, 0.0, 1.0, 0.0, 0.0], [0.5, 0, 0.5, 0.0, 1.0, 0.0, 0.0],
] ]
self.ee_pose_b_des_set = torch.tensor(ee_goals_set, device=self.sim.device) ee_pose_b_des_set = torch.tensor(ee_goals_set, device=sim.device)
def tearDown(self): yield sim, num_envs, ee_pose_b_des_set
"""Stops simulator after each test."""
# stop simulation # Cleanup
self.sim.stop() sim.stop()
self.sim.clear() sim.clear()
self.sim.clear_all_callbacks() sim.clear_all_callbacks()
self.sim.clear_instance() sim.clear_instance()
"""
Test fixtures.
"""
def test_franka_ik_pose_abs(self): def test_franka_ik_pose_abs(sim):
"""Test IK controller for Franka arm with Franka hand.""" """Test IK controller for Franka arm with Franka hand."""
sim_context, num_envs, ee_pose_b_des_set = sim
# Create robot instance # Create robot instance
robot_cfg = FRANKA_PANDA_HIGH_PD_CFG.replace(prim_path="/World/envs/env_.*/Robot") robot_cfg = FRANKA_PANDA_HIGH_PD_CFG.replace(prim_path="/World/envs/env_.*/Robot")
robot = Articulation(cfg=robot_cfg) robot = Articulation(cfg=robot_cfg)
# Create IK controller # Create IK controller
diff_ik_cfg = DifferentialIKControllerCfg(command_type="pose", use_relative_mode=False, ik_method="dls") diff_ik_cfg = DifferentialIKControllerCfg(command_type="pose", use_relative_mode=False, ik_method="dls")
diff_ik_controller = DifferentialIKController(diff_ik_cfg, num_envs=self.num_envs, device=self.sim.device) diff_ik_controller = DifferentialIKController(diff_ik_cfg, num_envs=num_envs, device=sim_context.device)
# Run the controller and check that it converges to the goal # Run the controller and check that it converges to the goal
self._run_ik_controller(robot, diff_ik_controller, "panda_hand", ["panda_joint.*"]) _run_ik_controller(
robot, diff_ik_controller, "panda_hand", ["panda_joint.*"], sim_context, num_envs, ee_pose_b_des_set
)
def test_ur10_ik_pose_abs(self): def test_ur10_ik_pose_abs(sim):
"""Test IK controller for UR10 arm.""" """Test IK controller for UR10 arm."""
sim_context, num_envs, ee_pose_b_des_set = sim
# Create robot instance # Create robot instance
robot_cfg = UR10_CFG.replace(prim_path="/World/envs/env_.*/Robot") robot_cfg = UR10_CFG.replace(prim_path="/World/envs/env_.*/Robot")
robot_cfg.spawn.rigid_props.disable_gravity = True robot_cfg.spawn.rigid_props.disable_gravity = True
...@@ -111,26 +113,36 @@ class TestDifferentialIKController(unittest.TestCase): ...@@ -111,26 +113,36 @@ class TestDifferentialIKController(unittest.TestCase):
# Create IK controller # Create IK controller
diff_ik_cfg = DifferentialIKControllerCfg(command_type="pose", use_relative_mode=False, ik_method="dls") diff_ik_cfg = DifferentialIKControllerCfg(command_type="pose", use_relative_mode=False, ik_method="dls")
diff_ik_controller = DifferentialIKController(diff_ik_cfg, num_envs=self.num_envs, device=self.sim.device) diff_ik_controller = DifferentialIKController(diff_ik_cfg, num_envs=num_envs, device=sim_context.device)
# Run the controller and check that it converges to the goal # Run the controller and check that it converges to the goal
self._run_ik_controller(robot, diff_ik_controller, "ee_link", [".*"]) _run_ik_controller(robot, diff_ik_controller, "ee_link", [".*"], sim_context, num_envs, ee_pose_b_des_set)
"""
Helper functions.
"""
def _run_ik_controller( def _run_ik_controller(
self,
robot: Articulation, robot: Articulation,
diff_ik_controller: DifferentialIKController, diff_ik_controller: DifferentialIKController,
ee_frame_name: str, ee_frame_name: str,
arm_joint_names: list[str], arm_joint_names: list[str],
): sim: sim_utils.SimulationContext,
num_envs: int,
ee_pose_b_des_set: torch.Tensor,
):
"""Run the IK controller with the given parameters.
Args:
robot (Articulation): The robot to control.
diff_ik_controller (DifferentialIKController): The differential IK controller.
ee_frame_name (str): The name of the end-effector frame.
arm_joint_names (list[str]): The names of the arm joints.
sim (sim_utils.SimulationContext): The simulation context.
num_envs (int): The number of environments.
ee_pose_b_des_set (torch.Tensor): The set of desired end-effector poses.
"""
# Define simulation stepping # Define simulation stepping
sim_dt = self.sim.get_physics_dt() sim_dt = sim.get_physics_dt()
# Play the simulator # Play the simulator
self.sim.reset() sim.reset()
# Obtain the frame index of the end-effector # Obtain the frame index of the end-effector
ee_frame_idx = robot.find_bodies(ee_frame_name)[0][0] ee_frame_idx = robot.find_bodies(ee_frame_name)[0][0]
...@@ -144,8 +156,8 @@ class TestDifferentialIKController(unittest.TestCase): ...@@ -144,8 +156,8 @@ class TestDifferentialIKController(unittest.TestCase):
# Track the given command # Track the given command
current_goal_idx = 0 current_goal_idx = 0
# Current goal for the arm # Current goal for the arm
ee_pose_b_des = torch.zeros(self.num_envs, diff_ik_controller.action_dim, device=self.sim.device) ee_pose_b_des = torch.zeros(num_envs, diff_ik_controller.action_dim, device=sim.device)
ee_pose_b_des[:] = self.ee_pose_b_des_set[current_goal_idx] ee_pose_b_des[:] = ee_pose_b_des_set[current_goal_idx]
# Compute current pose of the end-effector # Compute current pose of the end-effector
ee_pose_w = robot.data.body_state_w[:, ee_frame_idx, 0:7] ee_pose_w = robot.data.body_state_w[:, ee_frame_idx, 0:7]
root_pose_w = robot.data.root_state_w[:, 0:7] root_pose_w = robot.data.root_state_w[:, 0:7]
...@@ -178,15 +190,15 @@ class TestDifferentialIKController(unittest.TestCase): ...@@ -178,15 +190,15 @@ class TestDifferentialIKController(unittest.TestCase):
robot.write_data_to_sim() robot.write_data_to_sim()
# randomize root state yaw, ik should work regardless base rotation # randomize root state yaw, ik should work regardless base rotation
root_state = robot.data.root_state_w.clone() root_state = robot.data.root_state_w.clone()
root_state[:, 3:7] = random_yaw_orientation(self.num_envs, self.sim.device) root_state[:, 3:7] = random_yaw_orientation(num_envs, sim.device)
robot.write_root_pose_to_sim(root_state[:, :7]) robot.write_root_pose_to_sim(root_state[:, :7])
robot.write_root_velocity_to_sim(root_state[:, 7:]) robot.write_root_velocity_to_sim(root_state[:, 7:])
robot.reset() robot.reset()
# reset actions # reset actions
ee_pose_b_des[:] = self.ee_pose_b_des_set[current_goal_idx] ee_pose_b_des[:] = ee_pose_b_des_set[current_goal_idx]
joint_pos_des = joint_pos[:, arm_joint_ids].clone() joint_pos_des = joint_pos[:, arm_joint_ids].clone()
# update goal for next iteration # update goal for next iteration
current_goal_idx = (current_goal_idx + 1) % len(self.ee_pose_b_des_set) current_goal_idx = (current_goal_idx + 1) % len(ee_pose_b_des_set)
# set the controller commands # set the controller commands
diff_ik_controller.reset() diff_ik_controller.reset()
diff_ik_controller.set_command(ee_pose_b_des) diff_ik_controller.set_command(ee_pose_b_des)
...@@ -213,10 +225,6 @@ class TestDifferentialIKController(unittest.TestCase): ...@@ -213,10 +225,6 @@ class TestDifferentialIKController(unittest.TestCase):
robot.set_joint_position_target(joint_pos_des, arm_joint_ids) robot.set_joint_position_target(joint_pos_des, arm_joint_ids)
robot.write_data_to_sim() robot.write_data_to_sim()
# perform step # perform step
self.sim.step(render=False) sim.step(render=False)
# update buffers # update buffers
robot.update(sim_dt) robot.update(sim_dt)
if __name__ == "__main__":
run_tests()
...@@ -11,7 +11,7 @@ import sys ...@@ -11,7 +11,7 @@ import sys
if sys.platform != "win32": if sys.platform != "win32":
import pinocchio # noqa: F401 import pinocchio # noqa: F401
from isaaclab.app import AppLauncher, run_tests from isaaclab.app import AppLauncher
# launch omniverse app # launch omniverse app
simulation_app = AppLauncher(headless=True).app simulation_app = AppLauncher(headless=True).app
...@@ -198,7 +198,3 @@ class TestPinkIKController(unittest.TestCase): ...@@ -198,7 +198,3 @@ class TestPinkIKController(unittest.TestCase):
self.right_hand_roll_link_pose[2] -= 0.05 self.right_hand_roll_link_pose[2] -= 0.05
env.close() env.close()
if __name__ == "__main__":
run_tests()
...@@ -11,15 +11,9 @@ warnings.filterwarnings("ignore", category=DeprecationWarning) ...@@ -11,15 +11,9 @@ warnings.filterwarnings("ignore", category=DeprecationWarning)
import numpy as np import numpy as np
import scipy.interpolate as interpolate import scipy.interpolate as interpolate
import unittest
from isaaclab.app import run_tests
def test_interpolation():
class TestScipyOperations(unittest.TestCase):
"""Tests for assuring scipy related operations used in Isaac Lab."""
def test_interpolation(self):
"""Test scipy interpolation 2D method.""" """Test scipy interpolation 2D method."""
# parameters # parameters
size = (10.0, 12.0) size = (10.0, 12.0)
...@@ -51,25 +45,15 @@ class TestScipyOperations(unittest.TestCase): ...@@ -51,25 +45,15 @@ class TestScipyOperations(unittest.TestCase):
# interpolate the sampled heights to obtain the height field # interpolate the sampled heights to obtain the height field
x_upsampled = np.linspace(0, size[0] * horizontal_scale, width_pixels) x_upsampled = np.linspace(0, size[0] * horizontal_scale, width_pixels)
y_upsampled = np.linspace(0, size[1] * horizontal_scale, length_pixels) y_upsampled = np.linspace(0, size[1] * horizontal_scale, length_pixels)
# -- method 1: interp2d (this will be deprecated in the future 1.12 release) # -- method 1: RegularGridInterpolator (replacing deprecated interp2d)
func_interp2d = interpolate.interp2d(y, x, height_field_downsampled, kind="cubic") func_RegularGridInterpolator = interpolate.RegularGridInterpolator((x, y), height_field_downsampled, method="cubic")
z_upsampled_interp2d = func_interp2d(y_upsampled, x_upsampled) xx_upsampled, yy_upsampled = np.meshgrid(x_upsampled, y_upsampled, indexing="ij", sparse=True)
z_upsampled_RegularGridInterpolator = func_RegularGridInterpolator((xx_upsampled, yy_upsampled))
# -- method 2: RectBivariateSpline (alternate to interp2d) # -- method 2: RectBivariateSpline (alternate to interp2d)
func_RectBiVariate = interpolate.RectBivariateSpline(x, y, height_field_downsampled) func_RectBiVariate = interpolate.RectBivariateSpline(x, y, height_field_downsampled)
z_upsampled_RectBivariant = func_RectBiVariate(x_upsampled, y_upsampled) z_upsampled_RectBivariant = func_RectBiVariate(x_upsampled, y_upsampled)
# -- method 3: RegularGridInterpolator (recommended from scipy but slow!)
# Ref: https://github.com/scipy/scipy/issues/18010
func_RegularGridInterpolator = interpolate.RegularGridInterpolator(
(x, y), height_field_downsampled, method="cubic"
)
xx_upsampled, yy_upsampled = np.meshgrid(x_upsampled, y_upsampled, indexing="ij", sparse=True)
z_upsampled_RegularGridInterpolator = func_RegularGridInterpolator((xx_upsampled, yy_upsampled))
# check if the interpolated height field is the same as the sampled height field # check if the interpolated height field is the same as the sampled height field
np.testing.assert_allclose(z_upsampled_interp2d, z_upsampled_RectBivariant, atol=1e-14) np.testing.assert_allclose(z_upsampled_RegularGridInterpolator, z_upsampled_RectBivariant, atol=1e-14)
np.testing.assert_allclose(z_upsampled_RectBivariant, z_upsampled_RegularGridInterpolator, atol=1e-14) np.testing.assert_allclose(z_upsampled_RectBivariant, z_upsampled_RegularGridInterpolator, atol=1e-14)
np.testing.assert_allclose(z_upsampled_RegularGridInterpolator, z_upsampled_interp2d, atol=1e-14) np.testing.assert_allclose(z_upsampled_RegularGridInterpolator, z_upsampled_RegularGridInterpolator, atol=1e-14)
if __name__ == "__main__":
run_tests()
...@@ -5,36 +5,33 @@ ...@@ -5,36 +5,33 @@
import torch import torch
import torch.utils.benchmark as benchmark import torch.utils.benchmark as benchmark
import unittest
from isaaclab.app import run_tests import pytest
class TestTorchOperations(unittest.TestCase): def test_array_slicing():
"""Tests for assuring torch related operations used in Isaac Lab."""
def test_array_slicing(self):
"""Check that using ellipsis and slices work for torch tensors.""" """Check that using ellipsis and slices work for torch tensors."""
size = (400, 300, 5) size = (400, 300, 5)
my_tensor = torch.rand(size, device="cuda:0") my_tensor = torch.rand(size, device="cuda:0")
self.assertEqual(my_tensor[..., 0].shape, (400, 300)) assert my_tensor[..., 0].shape == (400, 300)
self.assertEqual(my_tensor[:, :, 0].shape, (400, 300)) assert my_tensor[:, :, 0].shape == (400, 300)
self.assertEqual(my_tensor[slice(None), slice(None), 0].shape, (400, 300)) assert my_tensor[slice(None), slice(None), 0].shape == (400, 300)
with self.assertRaises(IndexError): with pytest.raises(IndexError):
my_tensor[..., ..., 0] my_tensor[..., ..., 0]
self.assertEqual(my_tensor[0, ...].shape, (300, 5)) assert my_tensor[0, ...].shape == (300, 5)
self.assertEqual(my_tensor[0, :, :].shape, (300, 5)) assert my_tensor[0, :, :].shape == (300, 5)
self.assertEqual(my_tensor[0, slice(None), slice(None)].shape, (300, 5)) assert my_tensor[0, slice(None), slice(None)].shape == (300, 5)
self.assertEqual(my_tensor[0, ..., ...].shape, (300, 5)) assert my_tensor[0, ..., ...].shape == (300, 5)
assert my_tensor[..., 0, 0].shape == (400,)
assert my_tensor[slice(None), 0, 0].shape == (400,)
assert my_tensor[:, 0, 0].shape == (400,)
self.assertEqual(my_tensor[..., 0, 0].shape, (400,))
self.assertEqual(my_tensor[slice(None), 0, 0].shape, (400,))
self.assertEqual(my_tensor[:, 0, 0].shape, (400,))
def test_array_circular(self): def test_array_circular():
"""Check circular buffer implementation in torch.""" """Check circular buffer implementation in torch."""
size = (10, 30, 5) size = (10, 30, 5)
...@@ -46,8 +43,8 @@ class TestTorchOperations(unittest.TestCase): ...@@ -46,8 +43,8 @@ class TestTorchOperations(unittest.TestCase):
my_tensor_1[:, 0, :] = my_tensor[:, -1, :] my_tensor_1[:, 0, :] = my_tensor[:, -1, :]
# check that circular buffer works as expected # check that circular buffer works as expected
error = torch.max(torch.abs(my_tensor_1 - my_tensor.roll(1, dims=1))) error = torch.max(torch.abs(my_tensor_1 - my_tensor.roll(1, dims=1)))
self.assertNotEqual(error.item(), 0.0) assert error.item() != 0.0
self.assertFalse(torch.allclose(my_tensor_1, my_tensor.roll(1, dims=1))) assert not torch.allclose(my_tensor_1, my_tensor.roll(1, dims=1))
# roll up the tensor with cloning # roll up the tensor with cloning
my_tensor_2 = my_tensor.clone() my_tensor_2 = my_tensor.clone()
...@@ -55,8 +52,8 @@ class TestTorchOperations(unittest.TestCase): ...@@ -55,8 +52,8 @@ class TestTorchOperations(unittest.TestCase):
my_tensor_2[:, 0, :] = my_tensor[:, -1, :] my_tensor_2[:, 0, :] = my_tensor[:, -1, :]
# check that circular buffer works as expected # check that circular buffer works as expected
error = torch.max(torch.abs(my_tensor_2 - my_tensor.roll(1, dims=1))) error = torch.max(torch.abs(my_tensor_2 - my_tensor.roll(1, dims=1)))
self.assertEqual(error.item(), 0.0) assert error.item() == 0.0
self.assertTrue(torch.allclose(my_tensor_2, my_tensor.roll(1, dims=1))) assert torch.allclose(my_tensor_2, my_tensor.roll(1, dims=1))
# roll up the tensor with detach operation # roll up the tensor with detach operation
my_tensor_3 = my_tensor.clone() my_tensor_3 = my_tensor.clone()
...@@ -64,8 +61,8 @@ class TestTorchOperations(unittest.TestCase): ...@@ -64,8 +61,8 @@ class TestTorchOperations(unittest.TestCase):
my_tensor_3[:, 0, :] = my_tensor[:, -1, :] my_tensor_3[:, 0, :] = my_tensor[:, -1, :]
# check that circular buffer works as expected # check that circular buffer works as expected
error = torch.max(torch.abs(my_tensor_3 - my_tensor.roll(1, dims=1))) error = torch.max(torch.abs(my_tensor_3 - my_tensor.roll(1, dims=1)))
self.assertNotEqual(error.item(), 0.0) assert error.item() != 0.0
self.assertFalse(torch.allclose(my_tensor_3, my_tensor.roll(1, dims=1))) assert not torch.allclose(my_tensor_3, my_tensor.roll(1, dims=1))
# roll up the tensor with roll operation # roll up the tensor with roll operation
my_tensor_4 = my_tensor.clone() my_tensor_4 = my_tensor.clone()
...@@ -73,10 +70,11 @@ class TestTorchOperations(unittest.TestCase): ...@@ -73,10 +70,11 @@ class TestTorchOperations(unittest.TestCase):
my_tensor_4[:, 0, :] = my_tensor[:, -1, :] my_tensor_4[:, 0, :] = my_tensor[:, -1, :]
# check that circular buffer works as expected # check that circular buffer works as expected
error = torch.max(torch.abs(my_tensor_4 - my_tensor.roll(1, dims=1))) error = torch.max(torch.abs(my_tensor_4 - my_tensor.roll(1, dims=1)))
self.assertEqual(error.item(), 0.0) assert error.item() == 0.0
self.assertTrue(torch.allclose(my_tensor_4, my_tensor.roll(1, dims=1))) assert torch.allclose(my_tensor_4, my_tensor.roll(1, dims=1))
def test_array_circular_copy(self):
def test_array_circular_copy():
"""Check that circular buffer implementation in torch is copying data.""" """Check that circular buffer implementation in torch is copying data."""
size = (10, 30, 5) size = (10, 30, 5)
...@@ -90,20 +88,22 @@ class TestTorchOperations(unittest.TestCase): ...@@ -90,20 +88,22 @@ class TestTorchOperations(unittest.TestCase):
# change the source tensor # change the source tensor
my_tensor[:, 0, :] = 1000 my_tensor[:, 0, :] = 1000
# check that circular buffer works as expected # check that circular buffer works as expected
self.assertFalse(torch.allclose(my_tensor_1, my_tensor.roll(1, dims=1))) assert not torch.allclose(my_tensor_1, my_tensor.roll(1, dims=1))
self.assertTrue(torch.allclose(my_tensor_1, my_tensor_clone.roll(1, dims=1))) assert torch.allclose(my_tensor_1, my_tensor_clone.roll(1, dims=1))
def test_array_multi_indexing(self): def test_array_multi_indexing():
"""Check multi-indexing works for torch tensors.""" """Check multi-indexing works for torch tensors."""
size = (400, 300, 5) size = (400, 300, 5)
my_tensor = torch.rand(size, device="cuda:0") my_tensor = torch.rand(size, device="cuda:0")
# this fails since array indexing cannot be broadcasted!! # this fails since array indexing cannot be broadcasted!!
with self.assertRaises(IndexError): with pytest.raises(IndexError):
my_tensor[[0, 1, 2, 3], [0, 1, 2, 3, 4]] my_tensor[[0, 1, 2, 3], [0, 1, 2, 3, 4]]
def test_array_single_indexing(self):
def test_array_single_indexing():
"""Check how indexing effects the returned tensor.""" """Check how indexing effects the returned tensor."""
size = (400, 300, 5) size = (400, 300, 5)
...@@ -111,21 +111,22 @@ class TestTorchOperations(unittest.TestCase): ...@@ -111,21 +111,22 @@ class TestTorchOperations(unittest.TestCase):
# obtain a slice of the tensor # obtain a slice of the tensor
my_slice = my_tensor[0, ...] my_slice = my_tensor[0, ...]
self.assertEqual(my_slice.untyped_storage().data_ptr(), my_tensor.untyped_storage().data_ptr()) assert my_slice.untyped_storage().data_ptr() == my_tensor.untyped_storage().data_ptr()
# obtain a slice over ranges # obtain a slice over ranges
my_slice = my_tensor[0:2, ...] my_slice = my_tensor[0:2, ...]
self.assertEqual(my_slice.untyped_storage().data_ptr(), my_tensor.untyped_storage().data_ptr()) assert my_slice.untyped_storage().data_ptr() == my_tensor.untyped_storage().data_ptr()
# obtain a slice over list # obtain a slice over list
my_slice = my_tensor[[0, 1], ...] my_slice = my_tensor[[0, 1], ...]
self.assertNotEqual(my_slice.untyped_storage().data_ptr(), my_tensor.untyped_storage().data_ptr()) assert my_slice.untyped_storage().data_ptr() != my_tensor.untyped_storage().data_ptr()
# obtain a slice over tensor # obtain a slice over tensor
my_slice = my_tensor[torch.tensor([0, 1]), ...] my_slice = my_tensor[torch.tensor([0, 1]), ...]
self.assertNotEqual(my_slice.untyped_storage().data_ptr(), my_tensor.untyped_storage().data_ptr()) assert my_slice.untyped_storage().data_ptr() != my_tensor.untyped_storage().data_ptr()
def test_logical_or(self): def test_logical_or():
"""Test bitwise or operation.""" """Test bitwise or operation."""
size = (400, 300, 5) size = (400, 300, 5)
...@@ -147,8 +148,4 @@ class TestTorchOperations(unittest.TestCase): ...@@ -147,8 +148,4 @@ class TestTorchOperations(unittest.TestCase):
output_logical_or = torch.logical_or(my_tensor_1, my_tensor_2) output_logical_or = torch.logical_or(my_tensor_1, my_tensor_2)
output_bitwise_or = my_tensor_1 | my_tensor_2 output_bitwise_or = my_tensor_1 | my_tensor_2
self.assertTrue(torch.allclose(output_logical_or, output_bitwise_or)) assert torch.allclose(output_logical_or, output_bitwise_or)
if __name__ == "__main__":
run_tests()
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
from __future__ import annotations from __future__ import annotations
from isaaclab.app import AppLauncher, run_tests from isaaclab.app import AppLauncher
# Can set this to False to see the GUI for debugging. # Can set this to False to see the GUI for debugging.
HEADLESS = True HEADLESS = True
...@@ -138,7 +138,3 @@ class TestOpenXRDevice(unittest.TestCase): ...@@ -138,7 +138,3 @@ class TestOpenXRDevice(unittest.TestCase):
device_1.reset() device_1.reset()
device_2.reset() device_2.reset()
env.close() env.close()
if __name__ == "__main__":
run_tests()
...@@ -5,11 +5,10 @@ ...@@ -5,11 +5,10 @@
"""Launch Isaac Sim Simulator first.""" """Launch Isaac Sim Simulator first."""
from isaaclab.app import AppLauncher, run_tests from isaaclab.app import AppLauncher
# launch the simulator # launch the simulator
app_launcher = AppLauncher(headless=True) simulation_app = AppLauncher(headless=True).app
simulation_app = app_launcher.app
"""Rest everything follows.""" """Rest everything follows."""
...@@ -18,11 +17,11 @@ import gymnasium as gym ...@@ -18,11 +17,11 @@ import gymnasium as gym
import shutil import shutil
import tempfile import tempfile
import torch import torch
import unittest
import uuid import uuid
import carb import carb
import omni.usd import omni.usd
import pytest
from isaaclab.envs.mdp.recorders.recorders_cfg import ActionStateRecorderManagerCfg from isaaclab.envs.mdp.recorders.recorders_cfg import ActionStateRecorderManagerCfg
...@@ -30,68 +29,22 @@ import isaaclab_tasks # noqa: F401 ...@@ -30,68 +29,22 @@ import isaaclab_tasks # noqa: F401
from isaaclab_tasks.utils.parse_cfg import parse_env_cfg from isaaclab_tasks.utils.parse_cfg import parse_env_cfg
class TestActionStateRecorderManagerCfg(unittest.TestCase): @pytest.fixture(scope="session", autouse=True)
"""Test cases for ActionStateRecorderManagerCfg recorder terms.""" def setup_carb_settings():
"""Set up carb settings to prevent simulation getting stuck."""
@classmethod
def setUpClass(cls):
# this flag is necessary to prevent a bug where the simulation gets stuck randomly when running the
# test on many environments.
carb_settings_iface = carb.settings.get_settings() carb_settings_iface = carb.settings.get_settings()
carb_settings_iface.set_bool("/physics/cooking/ujitsoCollisionCooking", False) carb_settings_iface.set_bool("/physics/cooking/ujitsoCollisionCooking", False)
def setUp(self):
# create a temporary directory to store the test datasets
self.temp_dir = tempfile.mkdtemp()
def tearDown(self):
# delete the temporary directory after the test
shutil.rmtree(self.temp_dir)
def test_action_state_reocrder_terms(self):
"""Check action state recorder terms."""
for task_name in ["Isaac-Lift-Cube-Franka-v0"]:
for device in ["cuda:0", "cpu"]:
for num_envs in [1, 2]:
with self.subTest(task_name=task_name, device=device):
omni.usd.get_context().new_stage()
dummy_dataset_filename = f"{uuid.uuid4()}.hdf5"
# parse configuration
env_cfg = parse_env_cfg(task_name, device=device, num_envs=num_envs)
# set recorder configurations for this test
env_cfg.recorders: ActionStateRecorderManagerCfg = ActionStateRecorderManagerCfg()
env_cfg.recorders.dataset_export_dir_path = self.temp_dir
env_cfg.recorders.dataset_filename = dummy_dataset_filename
# create environment @pytest.fixture
env = gym.make(task_name, cfg=env_cfg) def temp_dir():
"""Create a temporary directory for test datasets."""
temp_dir = tempfile.mkdtemp()
yield temp_dir
shutil.rmtree(temp_dir)
# reset all environment instances to trigger post-reset recorder callbacks
env.reset()
self.check_initial_state_recorder_term(env)
# reset only one environment that is not the first one
env.unwrapped.reset(env_ids=torch.tensor([num_envs - 1], device=env.unwrapped.device))
self.check_initial_state_recorder_term(env)
# close the environment
env.close()
def check_initial_state_recorder_term(self, env):
"""Check values recorded by the initial state recorder terms.
Args:
env: Environment instance.
"""
current_state = env.unwrapped.scene.get_state(is_relative=True)
for env_id in range(env.unwrapped.num_envs):
recorded_initial_state = env.unwrapped.recorder_manager.get_episode(env_id).get_initial_state()
are_states_equal, output_log = self.compare_states(recorded_initial_state, current_state, env_id)
self.assertTrue(are_states_equal, msg=output_log)
def compare_states(self, compared_state, ground_truth_state, ground_truth_env_id) -> (bool, str): def compare_states(compared_state, ground_truth_state, ground_truth_env_id) -> tuple[bool, str]:
"""Compare a state with the given ground_truth. """Compare a state with the given ground_truth.
Args: Args:
...@@ -119,5 +72,45 @@ class TestActionStateRecorderManagerCfg(unittest.TestCase): ...@@ -119,5 +72,45 @@ class TestActionStateRecorderManagerCfg(unittest.TestCase):
return True, "" return True, ""
if __name__ == "__main__": def check_initial_state_recorder_term(env):
run_tests() """Check values recorded by the initial state recorder terms.
Args:
env: Environment instance.
"""
current_state = env.unwrapped.scene.get_state(is_relative=True)
for env_id in range(env.unwrapped.num_envs):
recorded_initial_state = env.unwrapped.recorder_manager.get_episode(env_id).get_initial_state()
are_states_equal, output_log = compare_states(recorded_initial_state, current_state, env_id)
assert are_states_equal, output_log
@pytest.mark.parametrize("task_name", ["Isaac-Lift-Cube-Franka-v0"])
@pytest.mark.parametrize("device", ["cuda:0", "cpu"])
@pytest.mark.parametrize("num_envs", [1, 2])
def test_action_state_recorder_terms(task_name, device, num_envs, temp_dir):
"""Check action state recorder terms."""
omni.usd.get_context().new_stage()
dummy_dataset_filename = f"{uuid.uuid4()}.hdf5"
# parse configuration
env_cfg = parse_env_cfg(task_name, device=device, num_envs=num_envs)
# set recorder configurations for this test
env_cfg.recorders = ActionStateRecorderManagerCfg()
env_cfg.recorders.dataset_export_dir_path = temp_dir
env_cfg.recorders.dataset_filename = dummy_dataset_filename
# create environment
env = gym.make(task_name, cfg=env_cfg)
# reset all environment instances to trigger post-reset recorder callbacks
env.reset()
check_initial_state_recorder_term(env)
# reset only one environment that is not the first one
env.unwrapped.reset(env_ids=torch.tensor([num_envs - 1], device=env.unwrapped.device))
check_initial_state_recorder_term(env)
# close the environment
env.close()
...@@ -10,20 +10,15 @@ from __future__ import annotations ...@@ -10,20 +10,15 @@ from __future__ import annotations
"""Launch Isaac Sim Simulator first.""" """Launch Isaac Sim Simulator first."""
from isaaclab.app import AppLauncher, run_tests from isaaclab.app import AppLauncher
# Can set this to False to see the GUI for debugging
HEADLESS = True
# launch omniverse app # launch omniverse app
app_launcher = AppLauncher(headless=HEADLESS) simulation_app = AppLauncher(headless=True).app
simulation_app = app_launcher.app
"""Rest everything follows.""" """Rest everything follows."""
import unittest
import omni.usd import omni.usd
import pytest
from isaaclab.envs import DirectMARLEnv, DirectMARLEnvCfg from isaaclab.envs import DirectMARLEnv, DirectMARLEnvCfg
from isaaclab.scene import InteractiveSceneCfg from isaaclab.scene import InteractiveSceneCfg
...@@ -57,16 +52,9 @@ def get_empty_base_env_cfg(device: str = "cuda:0", num_envs: int = 1, env_spacin ...@@ -57,16 +52,9 @@ def get_empty_base_env_cfg(device: str = "cuda:0", num_envs: int = 1, env_spacin
return EmptyEnvCfg() return EmptyEnvCfg()
class TestDirectMARLEnv(unittest.TestCase): @pytest.mark.parametrize("device", ["cuda:0", "cpu"])
"""Test for direct MARL env class""" def test_initialization(device):
"""Test initialization of DirectMARLEnv."""
"""
Tests
"""
def test_initialization(self):
for device in ("cuda:0", "cpu"):
with self.subTest(device=device):
# create a new stage # create a new stage
omni.usd.get_context().new_stage() omni.usd.get_context().new_stage()
try: try:
...@@ -78,18 +66,14 @@ class TestDirectMARLEnv(unittest.TestCase): ...@@ -78,18 +66,14 @@ class TestDirectMARLEnv(unittest.TestCase):
else: else:
if hasattr(e, "obj") and hasattr(e.obj, "_is_closed"): if hasattr(e, "obj") and hasattr(e.obj, "_is_closed"):
e.obj.close() e.obj.close()
self.fail(f"Failed to set-up the DirectMARLEnv environment. Error: {e}") pytest.fail(f"Failed to set-up the DirectMARLEnv environment. Error: {e}")
# check multi-agent config # check multi-agent config
self.assertEqual(env.num_agents, 2) assert env.num_agents == 2
self.assertEqual(env.max_num_agents, 2) assert env.max_num_agents == 2
# check spaces # check spaces
self.assertEqual(env.state_space.shape, (7,)) assert env.state_space.shape == (7,)
self.assertEqual(len(env.observation_spaces), 2) assert len(env.observation_spaces) == 2
self.assertEqual(len(env.action_spaces), 2) assert len(env.action_spaces) == 2
# close the environment # close the environment
env.close() env.close()
if __name__ == "__main__":
run_tests()
...@@ -5,19 +5,18 @@ ...@@ -5,19 +5,18 @@
"""Launch Isaac Sim Simulator first.""" """Launch Isaac Sim Simulator first."""
from isaaclab.app import AppLauncher, run_tests from isaaclab.app import AppLauncher
# launch omniverse app # launch omniverse app
# need to set "enable_cameras" true to be able to do rendering tests # need to set "enable_cameras" true to be able to do rendering tests
app_launcher = AppLauncher(headless=True, enable_cameras=True) simulation_app = AppLauncher(headless=True, enable_cameras=True).app
simulation_app = app_launcher.app
"""Rest everything follows.""" """Rest everything follows."""
import torch import torch
import unittest
import omni.usd import omni.usd
import pytest
from isaaclab.envs import ( from isaaclab.envs import (
DirectRLEnv, DirectRLEnv,
...@@ -110,29 +109,40 @@ def create_direct_rl_env(render_interval: int): ...@@ -110,29 +109,40 @@ def create_direct_rl_env(render_interval: int):
return Env(cfg=EnvCfg()) return Env(cfg=EnvCfg())
class TestEnvRenderingLogic(unittest.TestCase): @pytest.fixture
"""Test the rendering logic of the different environment workflows.""" def physics_callback():
"""Create a physics callback for tracking physics steps."""
physics_time = 0.0
num_physics_steps = 0
def callback(dt):
nonlocal physics_time, num_physics_steps
physics_time += dt
num_physics_steps += 1
return callback, lambda: (physics_time, num_physics_steps)
@pytest.fixture
def render_callback():
"""Create a render callback for tracking render steps."""
render_time = 0.0
num_render_steps = 0
def callback(event):
nonlocal render_time, num_render_steps
render_time += event.payload["dt"]
num_render_steps += 1
def _physics_callback(self, dt): return callback, lambda: (render_time, num_render_steps)
# called at every physics step
self.physics_time += dt
self.num_physics_steps += 1 @pytest.mark.parametrize("env_type", ["manager_based_env", "manager_based_rl_env", "direct_rl_env"])
@pytest.mark.parametrize("render_interval", [1, 2, 4, 8, 10])
def _render_callback(self, event): def test_env_rendering_logic(env_type, render_interval, physics_callback, render_callback):
# called at every render step """Test the rendering logic of the different environment workflows."""
self.render_time += event.payload["dt"] physics_cb, get_physics_stats = physics_callback
self.num_render_steps += 1 render_cb, get_render_stats = render_callback
def test_env_rendering_logic(self):
for env_type in ["manager_based_env", "manager_based_rl_env", "direct_rl_env"]:
for render_interval in [1, 2, 4, 8, 10]:
with self.subTest(env_type=env_type, render_interval=render_interval):
# time tracking variables
self.physics_time = 0.0
self.render_time = 0.0
# step tracking variables
self.num_physics_steps = 0
self.num_render_steps = 0
# create a new stage # create a new stage
omni.usd.get_context().new_stage() omni.usd.get_context().new_stage()
...@@ -150,7 +160,7 @@ class TestEnvRenderingLogic(unittest.TestCase): ...@@ -150,7 +160,7 @@ class TestEnvRenderingLogic(unittest.TestCase):
else: else:
if hasattr(e, "obj") and hasattr(e.obj, "_is_closed"): if hasattr(e, "obj") and hasattr(e.obj, "_is_closed"):
e.obj.close() e.obj.close()
self.fail(f"Failed to set-up the environment {env_type}. Error: {e}") pytest.fail(f"Failed to set-up the environment {env_type}. Error: {e}")
# enable the flag to render the environment # enable the flag to render the environment
# note: this is only done for the unit testing to "fake" camera rendering. # note: this is only done for the unit testing to "fake" camera rendering.
...@@ -164,11 +174,11 @@ class TestEnvRenderingLogic(unittest.TestCase): ...@@ -164,11 +174,11 @@ class TestEnvRenderingLogic(unittest.TestCase):
# check that we are in partial rendering mode for the environment # check that we are in partial rendering mode for the environment
# this is enabled due to app launcher setting "enable_cameras=True" # this is enabled due to app launcher setting "enable_cameras=True"
self.assertEqual(env.sim.render_mode, SimulationContext.RenderMode.PARTIAL_RENDERING) assert env.sim.render_mode == SimulationContext.RenderMode.PARTIAL_RENDERING
# add physics and render callbacks # add physics and render callbacks
env.sim.add_physics_callback("physics_step", self._physics_callback) env.sim.add_physics_callback("physics_step", physics_cb)
env.sim.add_render_callback("render_step", self._render_callback) env.sim.add_render_callback("render_step", render_cb)
# create a zero action tensor for stepping the environment # create a zero action tensor for stepping the environment
actions = torch.zeros((env.num_envs, 0), device=env.device) actions = torch.zeros((env.num_envs, 0), device=env.device)
...@@ -179,30 +189,20 @@ class TestEnvRenderingLogic(unittest.TestCase): ...@@ -179,30 +189,20 @@ class TestEnvRenderingLogic(unittest.TestCase):
env.step(action=actions) env.step(action=actions)
# check that we have completed the correct number of physics steps # check that we have completed the correct number of physics steps
self.assertEqual( _, num_physics_steps = get_physics_stats()
self.num_physics_steps, (i + 1) * env.cfg.decimation, msg="Physics steps mismatch" assert num_physics_steps == (i + 1) * env.cfg.decimation, "Physics steps mismatch"
)
# check that we have simulated physics for the correct amount of time # check that we have simulated physics for the correct amount of time
self.assertAlmostEqual( physics_time, _ = get_physics_stats()
self.physics_time, self.num_physics_steps * env.cfg.sim.dt, msg="Physics time mismatch" assert abs(physics_time - num_physics_steps * env.cfg.sim.dt) < 1e-6, "Physics time mismatch"
)
# check that we have completed the correct number of rendering steps # check that we have completed the correct number of rendering steps
self.assertEqual( _, num_render_steps = get_render_stats()
self.num_render_steps, assert num_render_steps == (i + 1) * env.cfg.decimation // env.cfg.sim.render_interval, "Render steps mismatch"
(i + 1) * env.cfg.decimation // env.cfg.sim.render_interval,
msg="Render steps mismatch",
)
# check that we have rendered for the correct amount of time # check that we have rendered for the correct amount of time
self.assertAlmostEqual( render_time, _ = get_render_stats()
self.render_time, assert (
self.num_render_steps * env.cfg.sim.dt * env.cfg.sim.render_interval, abs(render_time - num_render_steps * env.cfg.sim.dt * env.cfg.sim.render_interval) < 1e-6
msg="Render time mismatch", ), "Render time mismatch"
)
# close the environment # close the environment
env.close() env.close()
if __name__ == "__main__":
run_tests()
...@@ -10,21 +10,17 @@ from __future__ import annotations ...@@ -10,21 +10,17 @@ from __future__ import annotations
"""Launch Isaac Sim Simulator first.""" """Launch Isaac Sim Simulator first."""
from isaaclab.app import AppLauncher, run_tests from isaaclab.app import AppLauncher
# Can set this to False to see the GUI for debugging
HEADLESS = True
# launch omniverse app # launch omniverse app
app_launcher = AppLauncher(headless=HEADLESS) simulation_app = AppLauncher(headless=True).app
simulation_app = app_launcher.app
"""Rest everything follows.""" """Rest everything follows."""
import torch import torch
import unittest
import omni.usd import omni.usd
import pytest
from isaaclab.envs import ManagerBasedEnv, ManagerBasedEnvCfg from isaaclab.envs import ManagerBasedEnv, ManagerBasedEnvCfg
from isaaclab.scene import InteractiveSceneCfg from isaaclab.scene import InteractiveSceneCfg
...@@ -71,29 +67,22 @@ def get_empty_base_env_cfg(device: str = "cuda:0", num_envs: int = 1, env_spacin ...@@ -71,29 +67,22 @@ def get_empty_base_env_cfg(device: str = "cuda:0", num_envs: int = 1, env_spacin
return EmptyEnvCfg() return EmptyEnvCfg()
class TestManagerBasedEnv(unittest.TestCase): @pytest.mark.parametrize("device", ["cuda:0", "cpu"])
"""Test for manager-based env class""" def test_initialization(device):
"""Test initialization of ManagerBasedEnv."""
"""
Tests
"""
def test_initialization(self):
for device in ("cuda:0", "cpu"):
with self.subTest(device=device):
# create a new stage # create a new stage
omni.usd.get_context().new_stage() omni.usd.get_context().new_stage()
# create environment # create environment
env = ManagerBasedEnv(cfg=get_empty_base_env_cfg(device=device)) env = ManagerBasedEnv(cfg=get_empty_base_env_cfg(device=device))
# check size of action manager terms # check size of action manager terms
self.assertEqual(env.action_manager.total_action_dim, 0) assert env.action_manager.total_action_dim == 0
self.assertEqual(len(env.action_manager.active_terms), 0) assert len(env.action_manager.active_terms) == 0
self.assertEqual(len(env.action_manager.action_term_dim), 0) assert len(env.action_manager.action_term_dim) == 0
# check size of observation manager terms # check size of observation manager terms
self.assertEqual(len(env.observation_manager.active_terms), 0) assert len(env.observation_manager.active_terms) == 0
self.assertEqual(len(env.observation_manager.group_obs_dim), 0) assert len(env.observation_manager.group_obs_dim) == 0
self.assertEqual(len(env.observation_manager.group_obs_term_dim), 0) assert len(env.observation_manager.group_obs_term_dim) == 0
self.assertEqual(len(env.observation_manager.group_obs_concatenate), 0) assert len(env.observation_manager.group_obs_concatenate) == 0
# create actions of correct size (1,0) # create actions of correct size (1,0)
act = torch.randn_like(env.action_manager.action) act = torch.randn_like(env.action_manager.action)
# step environment to verify setup # step environment to verify setup
...@@ -101,7 +90,3 @@ class TestManagerBasedEnv(unittest.TestCase): ...@@ -101,7 +90,3 @@ class TestManagerBasedEnv(unittest.TestCase):
obs, ext = env.step(action=act) obs, ext = env.step(action=act)
# close the environment # close the environment
env.close() env.close()
if __name__ == "__main__":
run_tests()
...@@ -10,19 +10,12 @@ from __future__ import annotations ...@@ -10,19 +10,12 @@ from __future__ import annotations
"""Launch Isaac Sim Simulator first.""" """Launch Isaac Sim Simulator first."""
from isaaclab.app import AppLauncher, run_tests from isaaclab.app import AppLauncher
# Can set this to False to see the GUI for debugging simulation_app = AppLauncher(headless=True, enable_cameras=True).app
HEADLESS = True
# launch omniverse app
app_launcher = AppLauncher(headless=HEADLESS, enable_cameras=True)
simulation_app = app_launcher.app
"""Rest everything follows.""" """Rest everything follows."""
import unittest
import carb import carb
import omni.usd import omni.usd
from isaacsim.core.utils.extensions import enable_extension from isaacsim.core.utils.extensions import enable_extension
...@@ -64,7 +57,7 @@ def get_empty_base_env_cfg(device: str = "cuda:0", num_envs: int = 1, env_spacin ...@@ -64,7 +57,7 @@ def get_empty_base_env_cfg(device: str = "cuda:0", num_envs: int = 1, env_spacin
rewards: EmptyManagerCfg = EmptyManagerCfg() rewards: EmptyManagerCfg = EmptyManagerCfg()
terminations: EmptyManagerCfg = EmptyManagerCfg() terminations: EmptyManagerCfg = EmptyManagerCfg()
# Define window # Define window
ui_window_class_type: ManagerBasedRLEnvWindow = ManagerBasedRLEnvWindow ui_window_class_type: type[ManagerBasedRLEnvWindow] = ManagerBasedRLEnvWindow
def __post_init__(self): def __post_init__(self):
"""Post initialization.""" """Post initialization."""
...@@ -81,14 +74,8 @@ def get_empty_base_env_cfg(device: str = "cuda:0", num_envs: int = 1, env_spacin ...@@ -81,14 +74,8 @@ def get_empty_base_env_cfg(device: str = "cuda:0", num_envs: int = 1, env_spacin
return EmptyEnvCfg() return EmptyEnvCfg()
class TestManagerBasedRLEnvUI(unittest.TestCase): def test_ui_window():
"""Test for manager-based RL env class UI""" """Test UI window of ManagerBasedRLEnv."""
"""
Tests
"""
def test_ui_window(self):
device = "cuda:0" device = "cuda:0"
# override sim setting to enable UI # override sim setting to enable UI
carb.settings.get_settings().set_bool("/app/window/enabled", True) carb.settings.get_settings().set_bool("/app/window/enabled", True)
...@@ -98,7 +85,3 @@ class TestManagerBasedRLEnvUI(unittest.TestCase): ...@@ -98,7 +85,3 @@ class TestManagerBasedRLEnvUI(unittest.TestCase):
env = ManagerBasedRLEnv(cfg=get_empty_base_env_cfg(device=device)) env = ManagerBasedRLEnv(cfg=get_empty_base_env_cfg(device=device))
# close the environment # close the environment
env.close() env.close()
if __name__ == "__main__":
run_tests()
...@@ -5,47 +5,44 @@ ...@@ -5,47 +5,44 @@
"""Launch Isaac Sim Simulator first.""" """Launch Isaac Sim Simulator first."""
from isaaclab.app import AppLauncher, run_tests from isaaclab.app import AppLauncher
# launch omniverse app # launch omniverse app
app_launcher = AppLauncher(headless=True) simulation_app = AppLauncher(headless=True).app
simulation_app = app_launcher.app
"""Rest everything follows.""" """Rest everything follows."""
import unittest
from collections import namedtuple from collections import namedtuple
import pytest
from isaaclab.envs.mdp import NullCommandCfg from isaaclab.envs.mdp import NullCommandCfg
class TestNullCommandTerm(unittest.TestCase): @pytest.fixture
"""Test cases for null command generator.""" def env():
"""Create a dummy environment."""
return namedtuple("ManagerBasedRLEnv", ["num_envs", "dt", "device"])(20, 0.1, "cpu")
def setUp(self) -> None:
self.env = namedtuple("ManagerBasedRLEnv", ["num_envs", "dt", "device"])(20, 0.1, "cpu")
def test_str(self): def test_str(env):
"""Test the string representation of the command manager.""" """Test the string representation of the command manager."""
cfg = NullCommandCfg() cfg = NullCommandCfg()
command_term = cfg.class_type(cfg, self.env) command_term = cfg.class_type(cfg, env)
# print the expected string # print the expected string
print() print()
print(command_term) print(command_term)
def test_compute(self):
def test_compute(env):
"""Test the compute function. For null command generator, it does nothing.""" """Test the compute function. For null command generator, it does nothing."""
cfg = NullCommandCfg() cfg = NullCommandCfg()
command_term = cfg.class_type(cfg, self.env) command_term = cfg.class_type(cfg, env)
# test the reset function # test the reset function
command_term.reset() command_term.reset()
# test the compute function # test the compute function
command_term.compute(dt=self.env.dt) command_term.compute(dt=env.dt)
# expect error # expect error
with self.assertRaises(RuntimeError): with pytest.raises(RuntimeError):
command_term.command command_term.command
if __name__ == "__main__":
run_tests()
...@@ -11,7 +11,7 @@ from __future__ import annotations ...@@ -11,7 +11,7 @@ from __future__ import annotations
"""Launch Isaac Sim Simulator first.""" """Launch Isaac Sim Simulator first."""
from isaaclab.app import AppLauncher, run_tests from isaaclab.app import AppLauncher
# launch omniverse app # launch omniverse app
app_launcher = AppLauncher(headless=True, enable_cameras=True) app_launcher = AppLauncher(headless=True, enable_cameras=True)
...@@ -355,7 +355,3 @@ class TestScaleRandomization(unittest.TestCase): ...@@ -355,7 +355,3 @@ class TestScaleRandomization(unittest.TestCase):
with self.assertRaises(RuntimeError): with self.assertRaises(RuntimeError):
env = ManagerBasedEnv(cfg_failure) env = ManagerBasedEnv(cfg_failure)
env.close() env.close()
if __name__ == "__main__":
run_tests()
...@@ -9,7 +9,7 @@ This script tests the functionality of texture randomization applied to the cart ...@@ -9,7 +9,7 @@ This script tests the functionality of texture randomization applied to the cart
"""Launch Isaac Sim Simulator first.""" """Launch Isaac Sim Simulator first."""
from isaaclab.app import AppLauncher, run_tests from isaaclab.app import AppLauncher
# launch omniverse app # launch omniverse app
app_launcher = AppLauncher(headless=True, enable_cameras=True) app_launcher = AppLauncher(headless=True, enable_cameras=True)
...@@ -199,8 +199,3 @@ class TestTextureRandomization(unittest.TestCase): ...@@ -199,8 +199,3 @@ class TestTextureRandomization(unittest.TestCase):
with self.assertRaises(RuntimeError): with self.assertRaises(RuntimeError):
env = ManagerBasedEnv(cfg_failure) env = ManagerBasedEnv(cfg_failure)
env.close() env.close()
if __name__ == "__main__":
# run the main function
run_tests()
...@@ -9,24 +9,14 @@ ...@@ -9,24 +9,14 @@
from __future__ import annotations from __future__ import annotations
import time import time
import unittest
from isaaclab.app import run_tests from isaaclab.app import AppLauncher
class TestKitStartUpPerformance(unittest.TestCase): def test_kit_start_up_time():
"""Test kit startup performance."""
def test_kit_start_up_time(self):
"""Test kit start-up time.""" """Test kit start-up time."""
from isaaclab.app import AppLauncher
start_time = time.time() start_time = time.time()
self.app_launcher = AppLauncher(headless=True).app app_launcher = AppLauncher(headless=True).app # noqa: F841
end_time = time.time() end_time = time.time()
elapsed_time = end_time - start_time elapsed_time = end_time - start_time
self.assertLessEqual(elapsed_time, 10.0) assert elapsed_time <= 10.0
if __name__ == "__main__":
run_tests()
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment