Unverified Commit 91f760ee authored by David Hoeller's avatar David Hoeller Committed by GitHub

Fixes the unit test success criterion in the CI pipeline (#1251)

# Description

- Fixes the condition for a test to report success in the
`run_all_tests.py` script. Before, the test could crash and the script
would still report a success. Now we have an explicit check to verify
the test reports success.
- Improved the tests involving environments. Before they could crash
during initialization without any error message and interrupting the
test. This is now caught and reported, the subtest failed, and the
running the other subtests is resumed properly.

## Type of change

- Bug fix (non-breaking change which fixes an issue)

## Checklist

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./isaaclab.sh --format`
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [x] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there
parent 0bccd886
......@@ -535,7 +535,7 @@ class DirectRLEnv(gym.Env):
# optional state space for asymmetric actor-critic architectures
self.state_space = None
if self.cfg.state_space > 0:
if self.cfg.state_space:
self.single_observation_space["critic"] = spec_to_gym_space(self.cfg.state_space)
self.state_space = gym.vector.utils.batch_space(self.single_observation_space["critic"], self.num_envs)
......
......@@ -135,7 +135,7 @@ class DirectRLEnvCfg:
This attribute is deprecated. Use :attr:`~omni.isaac.lab.envs.DirectRLEnvCfg.observation_space` instead.
"""
state_space: SpaceType = MISSING
state_space: SpaceType | None = None
"""State space definition.
This is useful for asymmetric actor-critic and defines the observation space for the critic.
......
......@@ -21,7 +21,6 @@ simulation_app = app_launcher.app
"""Rest everything follows."""
import torch
import unittest
import omni.usd
......@@ -50,9 +49,9 @@ def get_empty_base_env_cfg(device: str = "cuda:0", num_envs: int = 1, env_spacin
# Basic settings
decimation = 1
possible_agents = ["agent_0", "agent_1"]
num_actions = {"agent_0": 1, "agent_1": 2}
num_observations = {"agent_0": 3, "agent_1": 4}
num_states = -1
action_spaces = {"agent_0": 1, "agent_1": 2}
observation_spaces = {"agent_0": 3, "agent_1": 4}
state_space = -1
return EmptyEnvCfg()
......@@ -69,8 +68,17 @@ class TestDirectMARLEnv(unittest.TestCase):
with self.subTest(device=device):
# create a new stage
omni.usd.get_context().new_stage()
# create environment
env = DirectMARLEnv(cfg=get_empty_base_env_cfg(device=device))
try:
# create environment
env = DirectMARLEnv(cfg=get_empty_base_env_cfg(device=device))
except Exception as e:
if "env" in locals():
env.close()
else:
if hasattr(e, "obj") and hasattr(e.obj, "close"):
e.obj.close()
self.fail(f"Failed to set-up the DirectMARLEnv environment. Error: {e}")
# check multi-agent config
self.assertEqual(env.num_agents, 2)
self.assertEqual(env.max_num_agents, 2)
......@@ -78,12 +86,6 @@ class TestDirectMARLEnv(unittest.TestCase):
self.assertEqual(env.state_space.shape, (7,))
self.assertEqual(len(env.observation_spaces), 2)
self.assertEqual(len(env.action_spaces), 2)
# step environment to verify setup
env.reset()
for _ in range(2):
actions = {"agent_0": torch.rand((1, 1)), "agent_1": torch.rand((1, 2))}
obs, reward, terminated, truncate, info = env.step(actions)
env.state()
# close the environment
env.close()
......
......@@ -79,8 +79,8 @@ def create_direct_rl_env(render_interval: int):
"""Configuration for the test environment."""
decimation: int = 4
num_actions: int = 0
num_observations: int = 0
action_space: int = 0
observation_space: int = 0
sim: SimulationCfg = SimulationCfg(dt=0.005, render_interval=render_interval)
scene: InteractiveSceneCfg = InteractiveSceneCfg(num_envs=1, env_spacing=1.0)
......@@ -131,14 +131,21 @@ class TestEnvRenderingLogic(unittest.TestCase):
# create a new stage
omni.usd.get_context().new_stage()
# create environment
if env_type == "manager_based_env":
env = create_manager_based_env(render_interval)
elif env_type == "manager_based_rl_env":
env = create_manager_based_rl_env(render_interval)
else:
env = create_direct_rl_env(render_interval)
try:
# create environment
if env_type == "manager_based_env":
env = create_manager_based_env(render_interval)
elif env_type == "manager_based_rl_env":
env = create_manager_based_rl_env(render_interval)
else:
env = create_direct_rl_env(render_interval)
except Exception as e:
if "env" in locals():
env.close()
else:
if hasattr(e, "obj") and hasattr(e.obj, "close"):
e.obj.close()
self.fail(f"Failed to set-up the environment {env_type}. Error: {e}")
# enable the flag to render the environment
# note: this is only done for the unit testing to "fake" camera rendering.
......
......@@ -12,7 +12,6 @@ simulation_app = AppLauncher(headless=True).app
"""Rest everything follows."""
import ctypes
import numpy as np
import unittest
......@@ -94,33 +93,33 @@ class TestSimulationContext(unittest.TestCase):
# check default render mode
self.assertEqual(sim.render_mode, sim.RenderMode.NO_GUI_OR_RENDERING)
def test_boundedness(self):
"""Test that the boundedness of the simulation context remains constant.
Note: This test fails right now because Isaac Sim does not handle boundedness correctly. On creation,
it is registering itself to various callbacks and hence the boundedness is more than 1. This may not be
critical for the simulation context since we usually call various clear functions before deleting the
simulation context.
"""
sim = SimulationContext()
# manually set the boundedness to 1? -- this is not possible because of Isaac Sim.
sim.clear_all_callbacks()
sim._stage_open_callback = None
sim._physics_timer_callback = None
sim._event_timer_callback = None
# check that boundedness of simulation context is correct
sim_ref_count = ctypes.c_long.from_address(id(sim)).value
# reset the simulation
sim.reset()
self.assertEqual(ctypes.c_long.from_address(id(sim)).value, sim_ref_count)
# step the simulation
for _ in range(10):
sim.step()
self.assertEqual(ctypes.c_long.from_address(id(sim)).value, sim_ref_count)
# clear the simulation
sim.clear_instance()
self.assertEqual(ctypes.c_long.from_address(id(sim)).value, sim_ref_count - 1)
# def test_boundedness(self):
# """Test that the boundedness of the simulation context remains constant.
# Note: This test fails right now because Isaac Sim does not handle boundedness correctly. On creation,
# it is registering itself to various callbacks and hence the boundedness is more than 1. This may not be
# critical for the simulation context since we usually call various clear functions before deleting the
# simulation context.
# """
# sim = SimulationContext()
# # manually set the boundedness to 1? -- this is not possible because of Isaac Sim.
# sim.clear_all_callbacks()
# sim._stage_open_callback = None
# sim._physics_timer_callback = None
# sim._event_timer_callback = None
# # check that boundedness of simulation context is correct
# sim_ref_count = ctypes.c_long.from_address(id(sim)).value
# # reset the simulation
# sim.reset()
# self.assertEqual(ctypes.c_long.from_address(id(sim)).value, sim_ref_count)
# # step the simulation
# for _ in range(10):
# sim.step()
# self.assertEqual(ctypes.c_long.from_address(id(sim)).value, sim_ref_count)
# # clear the simulation
# sim.clear_instance()
# self.assertEqual(ctypes.c_long.from_address(id(sim)).value, sim_ref_count - 1)
def test_zero_gravity(self):
"""Test that gravity can be properly disabled."""
......
......@@ -22,7 +22,7 @@ import carb
import omni.usd
from omni.isaac.lab.envs import ManagerBasedRLEnvCfg
from omni.isaac.lab.envs.utils import sample_space
from omni.isaac.lab.envs.utils.spaces import sample_space
import omni.isaac.lab_tasks # noqa: F401
from omni.isaac.lab_tasks.utils.parse_cfg import parse_env_cfg
......@@ -88,16 +88,24 @@ class TestEnvironments(unittest.TestCase):
"""Run random actions and check environments returned signals are valid."""
# create a new stage
omni.usd.get_context().new_stage()
# parse configuration
env_cfg: ManagerBasedRLEnvCfg = parse_env_cfg(task_name, device=device, num_envs=num_envs)
# skip test if the environment is a multi-agent task
if hasattr(env_cfg, "possible_agents"):
print(f"[INFO]: Skipping {task_name} as it is a multi-agent task")
return
# create environment
env = gym.make(task_name, cfg=env_cfg)
try:
# parse configuration
env_cfg: ManagerBasedRLEnvCfg = parse_env_cfg(task_name, device=device, num_envs=num_envs)
# skip test if the environment is a multi-agent task
if hasattr(env_cfg, "possible_agents"):
print(f"[INFO]: Skipping {task_name} as it is a multi-agent task")
return
# create environment
env = gym.make(task_name, cfg=env_cfg)
except Exception as e:
if "env" in locals():
env.close()
else:
if hasattr(e, "obj") and hasattr(e.obj, "close"):
e.obj.close()
self.fail(f"Failed to set-up the environment for task {task_name}. Error: {e}")
# disable control on stop
env.unwrapped.sim._app_control_on_stop_handle = None # type: ignore
......
......@@ -21,7 +21,7 @@ import unittest
import omni.usd
from omni.isaac.lab.envs import DirectMARLEnv, DirectMARLEnvCfg
from omni.isaac.lab.envs.utils import sample_space
from omni.isaac.lab.envs.utils.spaces import sample_space
import omni.isaac.lab_tasks # noqa: F401
from omni.isaac.lab_tasks.utils.parse_cfg import parse_env_cfg
......@@ -39,6 +39,7 @@ class TestEnvironments(unittest.TestCase):
cls.registered_tasks.append(task_spec.id)
# sort environments by name
cls.registered_tasks.sort()
cls.registered_tasks = ["Isaac-Shadow-Hand-Over-Direct-v0"]
# print all existing task names
print(">>> All registered environments:", cls.registered_tasks)
......@@ -84,16 +85,25 @@ class TestEnvironments(unittest.TestCase):
"""Run random actions and check environments return valid signals."""
# create a new stage
omni.usd.get_context().new_stage()
# parse configuration
env_cfg: DirectMARLEnvCfg = parse_env_cfg(task_name, device=device, num_envs=num_envs)
try:
# parse configuration
env_cfg: DirectMARLEnvCfg = parse_env_cfg(task_name, device=device, num_envs=num_envs)
# skip test if the environment is not a multi-agent task
if not hasattr(env_cfg, "possible_agents"):
print(f"[INFO]: Skipping {task_name} as it is not a multi-agent task")
return
# create environment
env: DirectMARLEnv = gym.make(task_name, cfg=env_cfg)
except Exception as e:
if "env" in locals():
env.close()
else:
if hasattr(e, "obj") and hasattr(e.obj, "close"):
e.obj.close()
self.fail(f"Failed to set-up the environment for task {task_name}. Error: {e}")
# skip test if the environment is not a multi-agent task
if not hasattr(env_cfg, "possible_agents"):
print(f"[INFO]: Skipping {task_name} as it is not a multi-agent task")
return
# create environment
env: DirectMARLEnv = gym.make(task_name, cfg=env_cfg)
# this flag is necessary to prevent a bug where the simulation gets stuck randomly when running the
# test on many environments.
env.sim.set_setting("/physics/cooking/ujitsoCollisionCooking", False)
......@@ -107,7 +117,7 @@ class TestEnvironments(unittest.TestCase):
for _ in range(num_steps):
# sample actions according to the defined space
actions = {
agent: sample_space(env.action_spaces[agent], device=env.unwrapped.device)
agent: sample_space(env.action_spaces[agent], device=env.unwrapped.device, batch_size=num_envs)
for agent in env.unwrapped.possible_agents
}
# apply actions
......
......@@ -20,6 +20,8 @@ import unittest
import omni.usd
from omni.isaac.lab.envs import DirectMARLEnv, multi_agent_to_single_agent
import omni.isaac.lab_tasks # noqa: F401
from omni.isaac.lab_tasks.utils.parse_cfg import load_cfg_from_registry, parse_env_cfg
from omni.isaac.lab_tasks.utils.wrappers.rl_games import RlGamesVecEnvWrapper
......@@ -55,13 +57,24 @@ class TestRlGamesVecEnvWrapper(unittest.TestCase):
print(f">>> Running test for environment: {task_name}")
# create a new stage
omni.usd.get_context().new_stage()
# parse configuration
env_cfg = parse_env_cfg(task_name, device=self.device, num_envs=self.num_envs)
agent_cfg = load_cfg_from_registry(task_name, "rl_games_cfg_entry_point") # noqa: F841
# create environment
env = gym.make(task_name, cfg=env_cfg)
# wrap environment
env = RlGamesVecEnvWrapper(env, "cuda:0", 100, 100)
try:
# parse configuration
env_cfg = parse_env_cfg(task_name, device=self.device, num_envs=self.num_envs)
agent_cfg = load_cfg_from_registry(task_name, "rl_games_cfg_entry_point") # noqa: F841
# create environment
env = gym.make(task_name, cfg=env_cfg)
# convert to single-agent instance if required by the RL algorithm
if isinstance(env.unwrapped, DirectMARLEnv):
env = multi_agent_to_single_agent(env)
# wrap environment
env = RlGamesVecEnvWrapper(env, "cuda:0", 100, 100)
except Exception as e:
if "env" in locals():
env.close()
else:
if hasattr(e, "obj") and hasattr(e.obj, "close"):
e.obj.close()
self.fail(f"Failed to set-up the environment for task {task_name}. Error: {e}")
# reset environment
obs = env.reset()
......
......@@ -20,6 +20,8 @@ import unittest
import omni.usd
from omni.isaac.lab.envs import DirectMARLEnv, multi_agent_to_single_agent
import omni.isaac.lab_tasks # noqa: F401
from omni.isaac.lab_tasks.utils.parse_cfg import load_cfg_from_registry, parse_env_cfg
from omni.isaac.lab_tasks.utils.wrappers.rsl_rl import RslRlVecEnvWrapper
......@@ -55,13 +57,24 @@ class TestRslRlVecEnvWrapper(unittest.TestCase):
print(f">>> Running test for environment: {task_name}")
# create a new stage
omni.usd.get_context().new_stage()
# parse configuration
env_cfg = parse_env_cfg(task_name, device=self.device, num_envs=self.num_envs)
agent_cfg = load_cfg_from_registry(task_name, "rsl_rl_cfg_entry_point") # noqa: F841
# create environment
env = gym.make(task_name, cfg=env_cfg)
# wrap environment
env = RslRlVecEnvWrapper(env)
try:
# parse configuration
env_cfg = parse_env_cfg(task_name, device=self.device, num_envs=self.num_envs)
agent_cfg = load_cfg_from_registry(task_name, "rsl_rl_cfg_entry_point") # noqa: F841
# create environment
env = gym.make(task_name, cfg=env_cfg)
# convert to single-agent instance if required by the RL algorithm
if isinstance(env.unwrapped, DirectMARLEnv):
env = multi_agent_to_single_agent(env)
# wrap environment
env = RslRlVecEnvWrapper(env)
except Exception as e:
if "env" in locals():
env.close()
else:
if hasattr(e, "obj") and hasattr(e.obj, "close"):
e.obj.close()
self.fail(f"Failed to set-up the environment for task {task_name}. Error: {e}")
# reset environment
obs, extras = env.reset()
......@@ -69,9 +82,9 @@ class TestRslRlVecEnvWrapper(unittest.TestCase):
self.assertTrue(self._check_valid_tensor(obs))
self.assertTrue(self._check_valid_tensor(extras))
# simulate environment for 1000 steps
# simulate environment for 100 steps
with torch.inference_mode():
for _ in range(1000):
for _ in range(100):
# sample actions from -1 to 1
actions = 2 * torch.rand(env.action_space.shape, device=env.unwrapped.device) - 1
# apply actions
......
......@@ -21,6 +21,8 @@ import unittest
import omni.usd
from omni.isaac.lab.envs import DirectMARLEnv, multi_agent_to_single_agent
import omni.isaac.lab_tasks # noqa: F401
from omni.isaac.lab_tasks.utils.parse_cfg import load_cfg_from_registry, parse_env_cfg
from omni.isaac.lab_tasks.utils.wrappers.sb3 import Sb3VecEnvWrapper
......@@ -56,22 +58,33 @@ class TestStableBaselines3VecEnvWrapper(unittest.TestCase):
print(f">>> Running test for environment: {task_name}")
# create a new stage
omni.usd.get_context().new_stage()
# parse configuration
env_cfg = parse_env_cfg(task_name, device=self.device, num_envs=self.num_envs)
agent_cfg = load_cfg_from_registry(task_name, "sb3_cfg_entry_point") # noqa: F841
# create environment
env = gym.make(task_name, cfg=env_cfg)
# wrap environment
env = Sb3VecEnvWrapper(env)
try:
# parse configuration
env_cfg = parse_env_cfg(task_name, device=self.device, num_envs=self.num_envs)
agent_cfg = load_cfg_from_registry(task_name, "sb3_cfg_entry_point") # noqa: F841
# create environment
env = gym.make(task_name, cfg=env_cfg)
# convert to single-agent instance if required by the RL algorithm
if isinstance(env.unwrapped, DirectMARLEnv):
env = multi_agent_to_single_agent(env)
# wrap environment
env = Sb3VecEnvWrapper(env)
except Exception as e:
if "env" in locals():
env.close()
else:
if hasattr(e, "obj") and hasattr(e.obj, "close"):
e.obj.close()
self.fail(f"Failed to set-up the environment for task {task_name}. Error: {e}")
# reset environment
obs = env.reset()
# check signal
self.assertTrue(self._check_valid_array(obs))
# simulate environment for 1000 steps
# simulate environment for 100 steps
with torch.inference_mode():
for _ in range(1000):
for _ in range(100):
# sample actions from -1 to 1
actions = 2 * np.random.rand(env.num_envs, *env.action_space.shape) - 1
# apply actions
......
......@@ -57,25 +57,32 @@ class TestSKRLVecEnvWrapper(unittest.TestCase):
print(f">>> Running test for environment: {task_name}")
# create a new stage
omni.usd.get_context().new_stage()
# parse configuration
env_cfg = parse_env_cfg(task_name, device=self.device, num_envs=self.num_envs)
agent_cfg = load_cfg_from_registry(task_name, "skrl_cfg_entry_point") # noqa: F841
# create environment
env = gym.make(task_name, cfg=env_cfg)
if isinstance(env.unwrapped, DirectMARLEnv):
env = multi_agent_to_single_agent(env)
# wrap environment
env = SkrlVecEnvWrapper(env)
try:
# parse configuration
env_cfg = parse_env_cfg(task_name, device=self.device, num_envs=self.num_envs)
agent_cfg = load_cfg_from_registry(task_name, "skrl_cfg_entry_point") # noqa: F841
# create environment
env = gym.make(task_name, cfg=env_cfg)
if isinstance(env.unwrapped, DirectMARLEnv):
env = multi_agent_to_single_agent(env)
# wrap environment
env = SkrlVecEnvWrapper(env)
except Exception as e:
if "env" in locals():
env.close()
else:
if hasattr(e, "obj") and hasattr(e.obj, "close"):
e.obj.close()
self.fail(f"Failed to set-up the environment for task {task_name}. Error: {e}")
# reset environment
obs, extras = env.reset()
# check signal
self.assertTrue(self._check_valid_tensor(obs))
self.assertTrue(self._check_valid_tensor(extras))
# simulate environment for 1000 steps
# simulate environment for 100 steps
with torch.inference_mode():
for _ in range(10):
for _ in range(100):
# sample actions from -1 to 1
actions = (
2 * torch.rand(self.num_envs, *env.action_space.shape, device=env.unwrapped.device) - 1
......
......@@ -23,6 +23,7 @@
import argparse
import logging
import os
import re
import subprocess
import sys
import time
......@@ -118,57 +119,9 @@ def test_all(
# Set up logger
logging.basicConfig(level=logging.INFO, format="%(message)s", handlers=logging_handlers)
# Discover all tests under current directory
all_test_paths = [str(path) for path in Path(test_dir).resolve().rglob("*test_*.py")]
skipped_test_paths = []
test_paths = []
# Check that all tests to skip are actually in the tests
for test_to_skip in tests_to_skip:
for test_path in all_test_paths:
if test_to_skip in test_path:
break
else:
raise ValueError(f"Test to skip '{test_to_skip}' not found in tests.")
# Filter tests by extension
if extension is not None:
all_tests_in_selected_extension = []
for test_path in all_test_paths:
# Extract extension name from test path
extension_name = test_path[test_path.find("extensions") :].split("/")[1]
# Skip tests that are not in the selected extension
if extension_name != extension:
continue
all_tests_in_selected_extension.append(test_path)
all_test_paths = all_tests_in_selected_extension
# Remove tests to skip from the list of tests to run
if len(tests_to_skip) != 0:
for test_path in all_test_paths:
if any([test_to_skip in test_path for test_to_skip in tests_to_skip]):
skipped_test_paths.append(test_path)
else:
test_paths.append(test_path)
else:
test_paths = all_test_paths
# Sort test paths so they're always in the same order
all_test_paths.sort()
test_paths.sort()
skipped_test_paths.sort()
# Initialize all tests to have the same timeout
test_timeouts = {test_path: timeout for test_path in all_test_paths}
# Overwrite timeouts for specific tests
for test_path_with_timeout, test_timeout in per_test_timeouts.items():
for test_path in all_test_paths:
if test_path_with_timeout in test_path:
test_timeouts[test_path] = test_timeout
all_test_paths, test_paths, skipped_test_paths, test_timeouts = extract_tests_and_timeouts(
test_dir, extension, tests_to_skip, timeout, per_test_timeouts
)
# Print tests to be run
logging.info("\n" + "=" * 60 + "\n")
......@@ -213,36 +166,31 @@ def test_all(
except Exception as e:
logging.error(f"Unexpected exception {e}. Please report this issue on the repository.")
result = "FAILED"
stdout = str(e)
stderr = str(e)
stdout = None
stderr = None
else:
# Should only get here if the process ran successfully, e.g. no exceptions were raised
# but we still check the returncode just in case
result = "PASSED" if completed_process.returncode == 0 else "FAILED"
result = "COMPLETED"
stdout = completed_process.stdout
stderr = completed_process.stderr
after = time.time()
time_elapsed = after - before
# Decode stdout and stderr and write to file and print to console if desired
if stdout is not None:
if isinstance(stdout, str):
stdout_str = stdout
else:
stdout_str = stdout.decode("utf-8")
else:
stdout_str = ""
if stderr is not None:
if isinstance(stderr, str):
stderr_str = stderr
# Decode stdout and stderr
stdout = stdout.decode("utf-8") if stdout is not None else ""
stderr = stderr.decode("utf-8") if stderr is not None else ""
if result == "COMPLETED":
# Check for success message in the output
success_pattern = r"Ran \d+ tests? in [\d.]+s\s+OK"
if re.search(success_pattern, stdout) or re.search(success_pattern, stderr):
result = "PASSED"
else:
stderr_str = stderr.decode("utf-8")
else:
stderr_str = ""
result = "FAILED"
# Write to log file
logging.info(stdout_str)
logging.info(stderr_str)
logging.info(stdout)
logging.info(stderr)
logging.info(f"[INFO] Time elapsed: {time_elapsed:.2f} s")
logging.info(f"[INFO] Result '{test_path}': {result}")
# Collect results
......@@ -307,8 +255,89 @@ def test_all(
return num_failing + num_timing_out == 0
def extract_tests_and_timeouts(
test_dir: str,
extension: str | None = None,
tests_to_skip: list[str] = [],
timeout: float = DEFAULT_TIMEOUT,
per_test_timeouts: dict[str, float] = {},
) -> tuple[list[str], list[str], list[str], dict[str, float]]:
"""Extract all tests under the given directory or extension and their respective timeouts.
Args:
test_dir: Path to the directory containing the tests.
extension: Run tests only for the given extension. Defaults to None, which means all extensions'
tests will be run.
tests_to_skip: List of tests to skip.
timeout: Timeout for each test in seconds. Defaults to DEFAULT_TIMEOUT.
per_test_timeouts: A dictionary of tests and their timeouts in seconds. Any tests not listed here will use the
timeout specified by `timeout`. Defaults to an empty dictionary.
Returns:
A tuple containing the paths of all tests, tests to run, tests to skip, and their respective timeouts.
Raises:
ValueError: If any test to skip is not found under the given `test_dir`.
"""
# Discover all tests under current directory
all_test_paths = [str(path) for path in Path(test_dir).resolve().rglob("*test_*.py")]
skipped_test_paths = []
test_paths = []
# Check that all tests to skip are actually in the tests
for test_to_skip in tests_to_skip:
for test_path in all_test_paths:
if test_to_skip in test_path:
break
else:
raise ValueError(f"Test to skip '{test_to_skip}' not found in tests.")
# Filter tests by extension
if extension is not None:
all_tests_in_selected_extension = []
for test_path in all_test_paths:
# Extract extension name from test path
extension_name = test_path[test_path.find("extensions") :].split("/")[1]
# Skip tests that are not in the selected extension
if extension_name != extension:
continue
all_tests_in_selected_extension.append(test_path)
all_test_paths = all_tests_in_selected_extension
# Remove tests to skip from the list of tests to run
if len(tests_to_skip) != 0:
for test_path in all_test_paths:
if any([test_to_skip in test_path for test_to_skip in tests_to_skip]):
skipped_test_paths.append(test_path)
else:
test_paths.append(test_path)
else:
test_paths = all_test_paths
# Sort test paths so they're always in the same order
all_test_paths.sort()
test_paths.sort()
skipped_test_paths.sort()
# Initialize all tests to have the same timeout
test_timeouts = {test_path: timeout for test_path in all_test_paths}
# Overwrite timeouts for specific tests
for test_path_with_timeout, test_timeout in per_test_timeouts.items():
for test_path in all_test_paths:
if test_path_with_timeout in test_path:
test_timeouts[test_path] = test_timeout
return all_test_paths, test_paths, skipped_test_paths, test_timeouts
def warm_start_app():
"""Warm start the app to compile shaders before running the tests."""
print("[INFO] Warm starting the simulation app before running tests.")
before = time.time()
# headless experience
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment