Unverified Commit f20d74c5 authored by Louis LE LAY's avatar Louis LE LAY Committed by GitHub

Adds basic validation tests for scale-based randomization ranges (#3058)

# Description

While working on a task I made a tiny typo in the scale randomization
range for the stiffness-gain parameter. The robot still spawned, but its
behaviour was utterly bizarre. It only took a minute to spot the
mistake, yet it made me realize we have no guard-rails for this sort of
edge case.

This PR introduces a lightweight check that verifies, when
operation=="scale", the lower bound is non-negative and the upper bound
is not smaller than the lower one. Right now I cover the most common
parameters (stiffness, damping, mass, tendon gains, etc.), basically
anything that must stay positive to make physical sense.

If you’d like the same safeguard applied to other parameter types just
let me know and I’ll happily extend the patch.

## Type of change

- Bug fix (non-breaking change which fixes an issue)
- New feature (non-breaking change which adds functionality)

## Checklist

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./isaaclab.sh --format`
- [x] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [x] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there

---------
Signed-off-by: 's avatarLouis LE LAY <le.lay.louis@gmail.com>
Signed-off-by: 's avatarooctipus <zhengyuz@nvidia.com>
Co-authored-by: 's avatarlouislelay <louislelay@pal-robotics.com>
Co-authored-by: 's avatarooctipus <zhengyuz@nvidia.com>
parent c445da82
[package]
# Note: Semantic Versioning is used: https://semver.org/
version = "0.45.0"
version = "0.45.1"
# Description
......
Changelog
---------
0.45.1 (2025-08-16)
~~~~~~~~~~~~~~~~~~~
Added
^^^^^
* Added validations for scale-based randomization ranges across mass, actuator, joint, and tendon parameters.
Changed
^^^^^^^
* Refactored randomization functions into classes with initialization-time checks to avoid runtime overhead.
0.45.0 (2025-08-07)
~~~~~~~~~~~~~~~~~~~
......
......@@ -278,15 +278,7 @@ class randomize_rigid_body_material(ManagerTermBase):
self.asset.root_physx_view.set_material_properties(materials, env_ids)
def randomize_rigid_body_mass(
env: ManagerBasedEnv,
env_ids: torch.Tensor | None,
asset_cfg: SceneEntityCfg,
mass_distribution_params: tuple[float, float],
operation: Literal["add", "scale", "abs"],
distribution: Literal["uniform", "log_uniform", "gaussian"] = "uniform",
recompute_inertia: bool = True,
):
class randomize_rigid_body_mass(ManagerTermBase):
"""Randomize the mass of the bodies by adding, scaling, or setting random values.
This function allows randomizing the mass of the bodies of the asset. The function samples random values from the
......@@ -301,56 +293,94 @@ def randomize_rigid_body_mass(
This function uses CPU tensors to assign the body masses. It is recommended to use this function
only during the initialization of the environment.
"""
# extract the used quantities (to enable type-hinting)
asset: RigidObject | Articulation = env.scene[asset_cfg.name]
# resolve environment ids
if env_ids is None:
env_ids = torch.arange(env.scene.num_envs, device="cpu")
else:
env_ids = env_ids.cpu()
def __init__(self, cfg: EventTermCfg, env: ManagerBasedEnv):
"""Initialize the term.
# resolve body indices
if asset_cfg.body_ids == slice(None):
body_ids = torch.arange(asset.num_bodies, dtype=torch.int, device="cpu")
else:
body_ids = torch.tensor(asset_cfg.body_ids, dtype=torch.int, device="cpu")
Args:
cfg: The configuration of the event term.
env: The environment instance.
# get the current masses of the bodies (num_assets, num_bodies)
masses = asset.root_physx_view.get_masses()
Raises:
TypeError: If `params` is not a tuple of two numbers.
ValueError: If the operation is not supported.
ValueError: If the lower bound is negative or zero when not allowed.
ValueError: If the upper bound is less than the lower bound.
"""
super().__init__(cfg, env)
# apply randomization on default values
# this is to make sure when calling the function multiple times, the randomization is applied on the
# default values and not the previously randomized values
masses[env_ids[:, None], body_ids] = asset.data.default_mass[env_ids[:, None], body_ids].clone()
# extract the used quantities (to enable type-hinting)
self.asset_cfg: SceneEntityCfg = cfg.params["asset_cfg"]
self.asset: RigidObject | Articulation = env.scene[self.asset_cfg.name]
# check for valid operation
if cfg.params["operation"] == "scale":
if "mass_distribution_params" in cfg.params:
_validate_scale_range(
cfg.params["mass_distribution_params"], "mass_distribution_params", allow_zero=False
)
elif cfg.params["operation"] not in ("abs", "add"):
raise ValueError(
"Randomization term 'randomize_rigid_body_mass' does not support operation:"
f" '{cfg.params['operation']}'."
)
# sample from the given range
# note: we modify the masses in-place for all environments
# however, the setter takes care that only the masses of the specified environments are modified
masses = _randomize_prop_by_op(
masses, mass_distribution_params, env_ids, body_ids, operation=operation, distribution=distribution
)
def __call__(
self,
env: ManagerBasedEnv,
env_ids: torch.Tensor | None,
asset_cfg: SceneEntityCfg,
mass_distribution_params: tuple[float, float],
operation: Literal["add", "scale", "abs"],
distribution: Literal["uniform", "log_uniform", "gaussian"] = "uniform",
recompute_inertia: bool = True,
):
# resolve environment ids
if env_ids is None:
env_ids = torch.arange(env.scene.num_envs, device="cpu")
else:
env_ids = env_ids.cpu()
# set the mass into the physics simulation
asset.root_physx_view.set_masses(masses, env_ids)
# recompute inertia tensors if needed
if recompute_inertia:
# compute the ratios of the new masses to the initial masses
ratios = masses[env_ids[:, None], body_ids] / asset.data.default_mass[env_ids[:, None], body_ids]
# scale the inertia tensors by the the ratios
# since mass randomization is done on default values, we can use the default inertia tensors
inertias = asset.root_physx_view.get_inertias()
if isinstance(asset, Articulation):
# inertia has shape: (num_envs, num_bodies, 9) for articulation
inertias[env_ids[:, None], body_ids] = (
asset.data.default_inertia[env_ids[:, None], body_ids] * ratios[..., None]
)
# resolve body indices
if self.asset_cfg.body_ids == slice(None):
body_ids = torch.arange(self.asset.num_bodies, dtype=torch.int, device="cpu")
else:
# inertia has shape: (num_envs, 9) for rigid object
inertias[env_ids] = asset.data.default_inertia[env_ids] * ratios
# set the inertia tensors into the physics simulation
asset.root_physx_view.set_inertias(inertias, env_ids)
body_ids = torch.tensor(self.asset_cfg.body_ids, dtype=torch.int, device="cpu")
# get the current masses of the bodies (num_assets, num_bodies)
masses = self.asset.root_physx_view.get_masses()
# apply randomization on default values
# this is to make sure when calling the function multiple times, the randomization is applied on the
# default values and not the previously randomized values
masses[env_ids[:, None], body_ids] = self.asset.data.default_mass[env_ids[:, None], body_ids].clone()
# sample from the given range
# note: we modify the masses in-place for all environments
# however, the setter takes care that only the masses of the specified environments are modified
masses = _randomize_prop_by_op(
masses, mass_distribution_params, env_ids, body_ids, operation=operation, distribution=distribution
)
# set the mass into the physics simulation
self.asset.root_physx_view.set_masses(masses, env_ids)
# recompute inertia tensors if needed
if recompute_inertia:
# compute the ratios of the new masses to the initial masses
ratios = masses[env_ids[:, None], body_ids] / self.asset.data.default_mass[env_ids[:, None], body_ids]
# scale the inertia tensors by the the ratios
# since mass randomization is done on default values, we can use the default inertia tensors
inertias = self.asset.root_physx_view.get_inertias()
if isinstance(self.asset, Articulation):
# inertia has shape: (num_envs, num_bodies, 9) for articulation
inertias[env_ids[:, None], body_ids] = (
self.asset.data.default_inertia[env_ids[:, None], body_ids] * ratios[..., None]
)
else:
# inertia has shape: (num_envs, 9) for rigid object
inertias[env_ids] = self.asset.data.default_inertia[env_ids] * ratios
# set the inertia tensors into the physics simulation
self.asset.root_physx_view.set_inertias(inertias, env_ids)
def randomize_rigid_body_com(
......@@ -494,15 +524,7 @@ def randomize_physics_scene_gravity(
physics_sim_view.set_gravity(carb.Float3(*gravity))
def randomize_actuator_gains(
env: ManagerBasedEnv,
env_ids: torch.Tensor | None,
asset_cfg: SceneEntityCfg,
stiffness_distribution_params: tuple[float, float] | None = None,
damping_distribution_params: tuple[float, float] | None = None,
operation: Literal["add", "scale", "abs"] = "abs",
distribution: Literal["uniform", "log_uniform", "gaussian"] = "uniform",
):
class randomize_actuator_gains(ManagerTermBase):
"""Randomize the actuator gains in an articulation by adding, scaling, or setting random values.
This function allows randomizing the actuator stiffness and damping gains.
......@@ -515,69 +537,103 @@ def randomize_actuator_gains(
For implicit actuators, this function uses CPU tensors to assign the actuator gains into the simulation.
In such cases, it is recommended to use this function only during the initialization of the environment.
"""
# Extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# Resolve environment ids
if env_ids is None:
env_ids = torch.arange(env.scene.num_envs, device=asset.device)
def __init__(self, cfg: EventTermCfg, env: ManagerBasedEnv):
"""Initialize the term.
def randomize(data: torch.Tensor, params: tuple[float, float]) -> torch.Tensor:
return _randomize_prop_by_op(
data, params, dim_0_ids=None, dim_1_ids=actuator_indices, operation=operation, distribution=distribution
)
Args:
cfg: The configuration of the event term.
env: The environment instance.
Raises:
TypeError: If `params` is not a tuple of two numbers.
ValueError: If the operation is not supported.
ValueError: If the lower bound is negative or zero when not allowed.
ValueError: If the upper bound is less than the lower bound.
"""
super().__init__(cfg, env)
# extract the used quantities (to enable type-hinting)
self.asset_cfg: SceneEntityCfg = cfg.params["asset_cfg"]
self.asset: RigidObject | Articulation = env.scene[self.asset_cfg.name]
# check for valid operation
if cfg.params["operation"] == "scale":
if "stiffness_distribution_params" in cfg.params:
_validate_scale_range(
cfg.params["stiffness_distribution_params"], "stiffness_distribution_params", allow_zero=False
)
if "damping_distribution_params" in cfg.params:
_validate_scale_range(cfg.params["damping_distribution_params"], "damping_distribution_params")
elif cfg.params["operation"] not in ("abs", "add"):
raise ValueError(
"Randomization term 'randomize_actuator_gains' does not support operation:"
f" '{cfg.params['operation']}'."
)
def __call__(
self,
env: ManagerBasedEnv,
env_ids: torch.Tensor | None,
asset_cfg: SceneEntityCfg,
stiffness_distribution_params: tuple[float, float] | None = None,
damping_distribution_params: tuple[float, float] | None = None,
operation: Literal["add", "scale", "abs"] = "abs",
distribution: Literal["uniform", "log_uniform", "gaussian"] = "uniform",
):
# Resolve environment ids
if env_ids is None:
env_ids = torch.arange(env.scene.num_envs, device=self.asset.device)
def randomize(data: torch.Tensor, params: tuple[float, float]) -> torch.Tensor:
return _randomize_prop_by_op(
data, params, dim_0_ids=None, dim_1_ids=actuator_indices, operation=operation, distribution=distribution
)
# Loop through actuators and randomize gains
for actuator in asset.actuators.values():
if isinstance(asset_cfg.joint_ids, slice):
# we take all the joints of the actuator
actuator_indices = slice(None)
if isinstance(actuator.joint_indices, slice):
global_indices = slice(None)
# Loop through actuators and randomize gains
for actuator in self.asset.actuators.values():
if isinstance(self.asset_cfg.joint_ids, slice):
# we take all the joints of the actuator
actuator_indices = slice(None)
if isinstance(actuator.joint_indices, slice):
global_indices = slice(None)
else:
global_indices = torch.tensor(actuator.joint_indices, device=self.asset.device)
elif isinstance(actuator.joint_indices, slice):
# we take the joints defined in the asset config
global_indices = actuator_indices = torch.tensor(self.asset_cfg.joint_ids, device=self.asset.device)
else:
global_indices = torch.tensor(actuator.joint_indices, device=asset.device)
elif isinstance(actuator.joint_indices, slice):
# we take the joints defined in the asset config
global_indices = actuator_indices = torch.tensor(asset_cfg.joint_ids, device=asset.device)
else:
# we take the intersection of the actuator joints and the asset config joints
actuator_joint_indices = torch.tensor(actuator.joint_indices, device=asset.device)
asset_joint_ids = torch.tensor(asset_cfg.joint_ids, device=asset.device)
# the indices of the joints in the actuator that have to be randomized
actuator_indices = torch.nonzero(torch.isin(actuator_joint_indices, asset_joint_ids)).view(-1)
if len(actuator_indices) == 0:
continue
# maps actuator indices that have to be randomized to global joint indices
global_indices = actuator_joint_indices[actuator_indices]
# Randomize stiffness
if stiffness_distribution_params is not None:
stiffness = actuator.stiffness[env_ids].clone()
stiffness[:, actuator_indices] = asset.data.default_joint_stiffness[env_ids][:, global_indices].clone()
randomize(stiffness, stiffness_distribution_params)
actuator.stiffness[env_ids] = stiffness
if isinstance(actuator, ImplicitActuator):
asset.write_joint_stiffness_to_sim(stiffness, joint_ids=actuator.joint_indices, env_ids=env_ids)
# Randomize damping
if damping_distribution_params is not None:
damping = actuator.damping[env_ids].clone()
damping[:, actuator_indices] = asset.data.default_joint_damping[env_ids][:, global_indices].clone()
randomize(damping, damping_distribution_params)
actuator.damping[env_ids] = damping
if isinstance(actuator, ImplicitActuator):
asset.write_joint_damping_to_sim(damping, joint_ids=actuator.joint_indices, env_ids=env_ids)
# we take the intersection of the actuator joints and the asset config joints
actuator_joint_indices = torch.tensor(actuator.joint_indices, device=self.asset.device)
asset_joint_ids = torch.tensor(self.asset_cfg.joint_ids, device=self.asset.device)
# the indices of the joints in the actuator that have to be randomized
actuator_indices = torch.nonzero(torch.isin(actuator_joint_indices, asset_joint_ids)).view(-1)
if len(actuator_indices) == 0:
continue
# maps actuator indices that have to be randomized to global joint indices
global_indices = actuator_joint_indices[actuator_indices]
# Randomize stiffness
if stiffness_distribution_params is not None:
stiffness = actuator.stiffness[env_ids].clone()
stiffness[:, actuator_indices] = self.asset.data.default_joint_stiffness[env_ids][
:, global_indices
].clone()
randomize(stiffness, stiffness_distribution_params)
actuator.stiffness[env_ids] = stiffness
if isinstance(actuator, ImplicitActuator):
self.asset.write_joint_stiffness_to_sim(
stiffness, joint_ids=actuator.joint_indices, env_ids=env_ids
)
# Randomize damping
if damping_distribution_params is not None:
damping = actuator.damping[env_ids].clone()
damping[:, actuator_indices] = self.asset.data.default_joint_damping[env_ids][:, global_indices].clone()
randomize(damping, damping_distribution_params)
actuator.damping[env_ids] = damping
if isinstance(actuator, ImplicitActuator):
self.asset.write_joint_damping_to_sim(damping, joint_ids=actuator.joint_indices, env_ids=env_ids)
def randomize_joint_parameters(
env: ManagerBasedEnv,
env_ids: torch.Tensor | None,
asset_cfg: SceneEntityCfg,
friction_distribution_params: tuple[float, float] | None = None,
armature_distribution_params: tuple[float, float] | None = None,
lower_limit_distribution_params: tuple[float, float] | None = None,
upper_limit_distribution_params: tuple[float, float] | None = None,
operation: Literal["add", "scale", "abs"] = "abs",
distribution: Literal["uniform", "log_uniform", "gaussian"] = "uniform",
):
class randomize_joint_parameters(ManagerTermBase):
"""Randomize the simulated joint parameters of an articulation by adding, scaling, or setting random values.
This function allows randomizing the joint parameters of the asset. These correspond to the physics engine
......@@ -592,97 +648,126 @@ def randomize_joint_parameters(
This function uses CPU tensors to assign the joint properties. It is recommended to use this function
only during the initialization of the environment.
"""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# resolve environment ids
if env_ids is None:
env_ids = torch.arange(env.scene.num_envs, device=asset.device)
def __init__(self, cfg: EventTermCfg, env: ManagerBasedEnv):
"""Initialize the term.
# resolve joint indices
if asset_cfg.joint_ids == slice(None):
joint_ids = slice(None) # for optimization purposes
else:
joint_ids = torch.tensor(asset_cfg.joint_ids, dtype=torch.int, device=asset.device)
# sample joint properties from the given ranges and set into the physics simulation
# joint friction coefficient
if friction_distribution_params is not None:
friction_coeff = _randomize_prop_by_op(
asset.data.default_joint_friction_coeff.clone(),
friction_distribution_params,
env_ids,
joint_ids,
operation=operation,
distribution=distribution,
)
asset.write_joint_friction_coefficient_to_sim(
friction_coeff[env_ids[:, None], joint_ids], joint_ids=joint_ids, env_ids=env_ids
)
Args:
cfg: The configuration of the event term.
env: The environment instance.
# joint armature
if armature_distribution_params is not None:
armature = _randomize_prop_by_op(
asset.data.default_joint_armature.clone(),
armature_distribution_params,
env_ids,
joint_ids,
operation=operation,
distribution=distribution,
)
asset.write_joint_armature_to_sim(armature[env_ids[:, None], joint_ids], joint_ids=joint_ids, env_ids=env_ids)
# joint position limits
if lower_limit_distribution_params is not None or upper_limit_distribution_params is not None:
joint_pos_limits = asset.data.default_joint_pos_limits.clone()
# -- randomize the lower limits
if lower_limit_distribution_params is not None:
joint_pos_limits[..., 0] = _randomize_prop_by_op(
joint_pos_limits[..., 0],
lower_limit_distribution_params,
Raises:
TypeError: If `params` is not a tuple of two numbers.
ValueError: If the operation is not supported.
ValueError: If the lower bound is negative or zero when not allowed.
ValueError: If the upper bound is less than the lower bound.
"""
super().__init__(cfg, env)
# extract the used quantities (to enable type-hinting)
self.asset_cfg: SceneEntityCfg = cfg.params["asset_cfg"]
self.asset: RigidObject | Articulation = env.scene[self.asset_cfg.name]
# check for valid operation
if cfg.params["operation"] == "scale":
if "friction_distribution_params" in cfg.params:
_validate_scale_range(cfg.params["friction_distribution_params"], "friction_distribution_params")
if "armature_distribution_params" in cfg.params:
_validate_scale_range(cfg.params["armature_distribution_params"], "armature_distribution_params")
elif cfg.params["operation"] not in ("abs", "add"):
raise ValueError(
"Randomization term 'randomize_fixed_tendon_parameters' does not support operation:"
f" '{cfg.params['operation']}'."
)
def __call__(
self,
env: ManagerBasedEnv,
env_ids: torch.Tensor | None,
asset_cfg: SceneEntityCfg,
friction_distribution_params: tuple[float, float] | None = None,
armature_distribution_params: tuple[float, float] | None = None,
lower_limit_distribution_params: tuple[float, float] | None = None,
upper_limit_distribution_params: tuple[float, float] | None = None,
operation: Literal["add", "scale", "abs"] = "abs",
distribution: Literal["uniform", "log_uniform", "gaussian"] = "uniform",
):
# resolve environment ids
if env_ids is None:
env_ids = torch.arange(env.scene.num_envs, device=self.asset.device)
# resolve joint indices
if self.asset_cfg.joint_ids == slice(None):
joint_ids = slice(None) # for optimization purposes
else:
joint_ids = torch.tensor(self.asset_cfg.joint_ids, dtype=torch.int, device=self.asset.device)
# sample joint properties from the given ranges and set into the physics simulation
# joint friction coefficient
if friction_distribution_params is not None:
friction_coeff = _randomize_prop_by_op(
self.asset.data.default_joint_friction_coeff.clone(),
friction_distribution_params,
env_ids,
joint_ids,
operation=operation,
distribution=distribution,
)
# -- randomize the upper limits
if upper_limit_distribution_params is not None:
joint_pos_limits[..., 1] = _randomize_prop_by_op(
joint_pos_limits[..., 1],
upper_limit_distribution_params,
self.asset.write_joint_friction_coefficient_to_sim(
friction_coeff[env_ids[:, None], joint_ids], joint_ids=joint_ids, env_ids=env_ids
)
# joint armature
if armature_distribution_params is not None:
armature = _randomize_prop_by_op(
self.asset.data.default_joint_armature.clone(),
armature_distribution_params,
env_ids,
joint_ids,
operation=operation,
distribution=distribution,
)
self.asset.write_joint_armature_to_sim(
armature[env_ids[:, None], joint_ids], joint_ids=joint_ids, env_ids=env_ids
)
# extract the position limits for the concerned joints
joint_pos_limits = joint_pos_limits[env_ids[:, None], joint_ids]
if (joint_pos_limits[..., 0] > joint_pos_limits[..., 1]).any():
raise ValueError(
"Randomization term 'randomize_joint_parameters' is setting lower joint limits that are greater than"
" upper joint limits. Please check the distribution parameters for the joint position limits."
# joint position limits
if lower_limit_distribution_params is not None or upper_limit_distribution_params is not None:
joint_pos_limits = self.asset.data.default_joint_pos_limits.clone()
# -- randomize the lower limits
if lower_limit_distribution_params is not None:
joint_pos_limits[..., 0] = _randomize_prop_by_op(
joint_pos_limits[..., 0],
lower_limit_distribution_params,
env_ids,
joint_ids,
operation=operation,
distribution=distribution,
)
# -- randomize the upper limits
if upper_limit_distribution_params is not None:
joint_pos_limits[..., 1] = _randomize_prop_by_op(
joint_pos_limits[..., 1],
upper_limit_distribution_params,
env_ids,
joint_ids,
operation=operation,
distribution=distribution,
)
# extract the position limits for the concerned joints
joint_pos_limits = joint_pos_limits[env_ids[:, None], joint_ids]
if (joint_pos_limits[..., 0] > joint_pos_limits[..., 1]).any():
raise ValueError(
"Randomization term 'randomize_joint_parameters' is setting lower joint limits that are greater"
" than upper joint limits. Please check the distribution parameters for the joint position limits."
)
# set the position limits into the physics simulation
self.asset.write_joint_position_limit_to_sim(
joint_pos_limits, joint_ids=joint_ids, env_ids=env_ids, warn_limit_violation=False
)
# set the position limits into the physics simulation
asset.write_joint_position_limit_to_sim(
joint_pos_limits, joint_ids=joint_ids, env_ids=env_ids, warn_limit_violation=False
)
def randomize_fixed_tendon_parameters(
env: ManagerBasedEnv,
env_ids: torch.Tensor | None,
asset_cfg: SceneEntityCfg,
stiffness_distribution_params: tuple[float, float] | None = None,
damping_distribution_params: tuple[float, float] | None = None,
limit_stiffness_distribution_params: tuple[float, float] | None = None,
lower_limit_distribution_params: tuple[float, float] | None = None,
upper_limit_distribution_params: tuple[float, float] | None = None,
rest_length_distribution_params: tuple[float, float] | None = None,
offset_distribution_params: tuple[float, float] | None = None,
operation: Literal["add", "scale", "abs"] = "abs",
distribution: Literal["uniform", "log_uniform", "gaussian"] = "uniform",
):
class randomize_fixed_tendon_parameters(ManagerTermBase):
"""Randomize the simulated fixed tendon parameters of an articulation by adding, scaling, or setting random values.
This function allows randomizing the fixed tendon parameters of the asset.
......@@ -693,115 +778,166 @@ def randomize_fixed_tendon_parameters(
particular property, the function does not modify the property.
"""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# resolve environment ids
if env_ids is None:
env_ids = torch.arange(env.scene.num_envs, device=asset.device)
def __init__(self, cfg: EventTermCfg, env: ManagerBasedEnv):
"""Initialize the term.
# resolve joint indices
if asset_cfg.fixed_tendon_ids == slice(None):
tendon_ids = slice(None) # for optimization purposes
else:
tendon_ids = torch.tensor(asset_cfg.fixed_tendon_ids, dtype=torch.int, device=asset.device)
# sample tendon properties from the given ranges and set into the physics simulation
# stiffness
if stiffness_distribution_params is not None:
stiffness = _randomize_prop_by_op(
asset.data.default_fixed_tendon_stiffness.clone(),
stiffness_distribution_params,
env_ids,
tendon_ids,
operation=operation,
distribution=distribution,
)
asset.set_fixed_tendon_stiffness(stiffness[env_ids[:, None], tendon_ids], tendon_ids, env_ids)
# damping
if damping_distribution_params is not None:
damping = _randomize_prop_by_op(
asset.data.default_fixed_tendon_damping.clone(),
damping_distribution_params,
env_ids,
tendon_ids,
operation=operation,
distribution=distribution,
)
asset.set_fixed_tendon_damping(damping[env_ids[:, None], tendon_ids], tendon_ids, env_ids)
# limit stiffness
if limit_stiffness_distribution_params is not None:
limit_stiffness = _randomize_prop_by_op(
asset.data.default_fixed_tendon_limit_stiffness.clone(),
limit_stiffness_distribution_params,
env_ids,
tendon_ids,
operation=operation,
distribution=distribution,
)
asset.set_fixed_tendon_limit_stiffness(limit_stiffness[env_ids[:, None], tendon_ids], tendon_ids, env_ids)
# position limits
if lower_limit_distribution_params is not None or upper_limit_distribution_params is not None:
limit = asset.data.default_fixed_tendon_pos_limits.clone()
# -- lower limit
if lower_limit_distribution_params is not None:
limit[..., 0] = _randomize_prop_by_op(
limit[..., 0],
lower_limit_distribution_params,
Args:
cfg: The configuration of the event term.
env: The environment instance.
Raises:
TypeError: If `params` is not a tuple of two numbers.
ValueError: If the operation is not supported.
ValueError: If the lower bound is negative or zero when not allowed.
ValueError: If the upper bound is less than the lower bound.
"""
super().__init__(cfg, env)
# extract the used quantities (to enable type-hinting)
self.asset_cfg: SceneEntityCfg = cfg.params["asset_cfg"]
self.asset: RigidObject | Articulation = env.scene[self.asset_cfg.name]
# check for valid operation
if cfg.params["operation"] == "scale":
if "stiffness_distribution_params" in cfg.params:
_validate_scale_range(
cfg.params["stiffness_distribution_params"], "stiffness_distribution_params", allow_zero=False
)
if "damping_distribution_params" in cfg.params:
_validate_scale_range(cfg.params["damping_distribution_params"], "damping_distribution_params")
if "limit_stiffness_distribution_params" in cfg.params:
_validate_scale_range(
cfg.params["limit_stiffness_distribution_params"], "limit_stiffness_distribution_params"
)
elif cfg.params["operation"] not in ("abs", "add"):
raise ValueError(
"Randomization term 'randomize_fixed_tendon_parameters' does not support operation:"
f" '{cfg.params['operation']}'."
)
def __call__(
self,
env: ManagerBasedEnv,
env_ids: torch.Tensor | None,
asset_cfg: SceneEntityCfg,
stiffness_distribution_params: tuple[float, float] | None = None,
damping_distribution_params: tuple[float, float] | None = None,
limit_stiffness_distribution_params: tuple[float, float] | None = None,
lower_limit_distribution_params: tuple[float, float] | None = None,
upper_limit_distribution_params: tuple[float, float] | None = None,
rest_length_distribution_params: tuple[float, float] | None = None,
offset_distribution_params: tuple[float, float] | None = None,
operation: Literal["add", "scale", "abs"] = "abs",
distribution: Literal["uniform", "log_uniform", "gaussian"] = "uniform",
):
# resolve environment ids
if env_ids is None:
env_ids = torch.arange(env.scene.num_envs, device=self.asset.device)
# resolve joint indices
if self.asset_cfg.fixed_tendon_ids == slice(None):
tendon_ids = slice(None) # for optimization purposes
else:
tendon_ids = torch.tensor(self.asset_cfg.fixed_tendon_ids, dtype=torch.int, device=self.asset.device)
# sample tendon properties from the given ranges and set into the physics simulation
# stiffness
if stiffness_distribution_params is not None:
stiffness = _randomize_prop_by_op(
self.asset.data.default_fixed_tendon_stiffness.clone(),
stiffness_distribution_params,
env_ids,
tendon_ids,
operation=operation,
distribution=distribution,
)
# -- upper limit
if upper_limit_distribution_params is not None:
limit[..., 1] = _randomize_prop_by_op(
limit[..., 1],
upper_limit_distribution_params,
self.asset.set_fixed_tendon_stiffness(stiffness[env_ids[:, None], tendon_ids], tendon_ids, env_ids)
# damping
if damping_distribution_params is not None:
damping = _randomize_prop_by_op(
self.asset.data.default_fixed_tendon_damping.clone(),
damping_distribution_params,
env_ids,
tendon_ids,
operation=operation,
distribution=distribution,
)
self.asset.set_fixed_tendon_damping(damping[env_ids[:, None], tendon_ids], tendon_ids, env_ids)
# check if the limits are valid
tendon_limits = limit[env_ids[:, None], tendon_ids]
if (tendon_limits[..., 0] > tendon_limits[..., 1]).any():
raise ValueError(
"Randomization term 'randomize_fixed_tendon_parameters' is setting lower tendon limits that are greater"
" than upper tendon limits."
# limit stiffness
if limit_stiffness_distribution_params is not None:
limit_stiffness = _randomize_prop_by_op(
self.asset.data.default_fixed_tendon_limit_stiffness.clone(),
limit_stiffness_distribution_params,
env_ids,
tendon_ids,
operation=operation,
distribution=distribution,
)
self.asset.set_fixed_tendon_limit_stiffness(
limit_stiffness[env_ids[:, None], tendon_ids], tendon_ids, env_ids
)
asset.set_fixed_tendon_position_limit(tendon_limits, tendon_ids, env_ids)
# rest length
if rest_length_distribution_params is not None:
rest_length = _randomize_prop_by_op(
asset.data.default_fixed_tendon_rest_length.clone(),
rest_length_distribution_params,
env_ids,
tendon_ids,
operation=operation,
distribution=distribution,
)
asset.set_fixed_tendon_rest_length(rest_length[env_ids[:, None], tendon_ids], tendon_ids, env_ids)
# offset
if offset_distribution_params is not None:
offset = _randomize_prop_by_op(
asset.data.default_fixed_tendon_offset.clone(),
offset_distribution_params,
env_ids,
tendon_ids,
operation=operation,
distribution=distribution,
)
asset.set_fixed_tendon_offset(offset[env_ids[:, None], tendon_ids], tendon_ids, env_ids)
# write the fixed tendon properties into the simulation
asset.write_fixed_tendon_properties_to_sim(tendon_ids, env_ids)
# position limits
if lower_limit_distribution_params is not None or upper_limit_distribution_params is not None:
limit = self.asset.data.default_fixed_tendon_pos_limits.clone()
# -- lower limit
if lower_limit_distribution_params is not None:
limit[..., 0] = _randomize_prop_by_op(
limit[..., 0],
lower_limit_distribution_params,
env_ids,
tendon_ids,
operation=operation,
distribution=distribution,
)
# -- upper limit
if upper_limit_distribution_params is not None:
limit[..., 1] = _randomize_prop_by_op(
limit[..., 1],
upper_limit_distribution_params,
env_ids,
tendon_ids,
operation=operation,
distribution=distribution,
)
# check if the limits are valid
tendon_limits = limit[env_ids[:, None], tendon_ids]
if (tendon_limits[..., 0] > tendon_limits[..., 1]).any():
raise ValueError(
"Randomization term 'randomize_fixed_tendon_parameters' is setting lower tendon limits that are"
" greater than upper tendon limits."
)
self.asset.set_fixed_tendon_position_limit(tendon_limits, tendon_ids, env_ids)
# rest length
if rest_length_distribution_params is not None:
rest_length = _randomize_prop_by_op(
self.asset.data.default_fixed_tendon_rest_length.clone(),
rest_length_distribution_params,
env_ids,
tendon_ids,
operation=operation,
distribution=distribution,
)
self.asset.set_fixed_tendon_rest_length(rest_length[env_ids[:, None], tendon_ids], tendon_ids, env_ids)
# offset
if offset_distribution_params is not None:
offset = _randomize_prop_by_op(
self.asset.data.default_fixed_tendon_offset.clone(),
offset_distribution_params,
env_ids,
tendon_ids,
operation=operation,
distribution=distribution,
)
self.asset.set_fixed_tendon_offset(offset[env_ids[:, None], tendon_ids], tendon_ids, env_ids)
# write the fixed tendon properties into the simulation
self.asset.write_fixed_tendon_properties_to_sim(tendon_ids, env_ids)
def apply_external_force_torque(
......@@ -1568,3 +1704,47 @@ def _randomize_prop_by_op(
f"Unknown operation: '{operation}' for property randomization. Please use 'add', 'scale', or 'abs'."
)
return data
def _validate_scale_range(
params: tuple[float, float] | None,
name: str,
*,
allow_negative: bool = False,
allow_zero: bool = True,
) -> None:
"""
Validates a (low, high) tuple used in scale-based randomization.
This function ensures the tuple follows expected rules when applying a 'scale'
operation. It performs type and value checks, optionally allowing negative or
zero lower bounds.
Args:
params (tuple[float, float] | None): The (low, high) range to validate. If None,
validation is skipped.
name (str): The name of the parameter being validated, used for error messages.
allow_negative (bool, optional): If True, allows the lower bound to be negative.
Defaults to False.
allow_zero (bool, optional): If True, allows the lower bound to be zero.
Defaults to True.
Raises:
TypeError: If `params` is not a tuple of two numbers.
ValueError: If the lower bound is negative or zero when not allowed.
ValueError: If the upper bound is less than the lower bound.
Example:
_validate_scale_range((0.5, 1.5), "mass_scale")
"""
if params is None: # caller didn’t request randomisation for this field
return
low, high = params
if not isinstance(low, (int, float)) or not isinstance(high, (int, float)):
raise TypeError(f"{name}: expected (low, high) to be a tuple of numbers, got {params}.")
if not allow_negative and not allow_zero and low <= 0:
raise ValueError(f"{name}: lower bound must be > 0 when using the 'scale' operation (got {low}).")
if not allow_negative and allow_zero and low < 0:
raise ValueError(f"{name}: lower bound must be ≥ 0 when using the 'scale' operation (got {low}).")
if high < low:
raise ValueError(f"{name}: upper bound ({high}) must be ≥ lower bound ({low}).")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment