Unverified Commit cee5027b authored by ooctipus's avatar ooctipus Committed by GitHub

Adds new curriculum mdp that allows modification on any environment parameters (#2777)

# Description

This PR created two curriculum mdp that can change any parameter in env
instance.
namely `modify_term_cfg` and `modify_env_param`.

`modify_env_param` is a more general version that can override any value
belongs to env, but requires user to know the full path to the value.

`modify_term_cfg` only work with manager_term, but is a more user
friendly version that simplify path specification, for example, instead
of write "observation_manager.cfg.policy.joint_pos.noise", you instead
write "observations.policy.joint_pos.noise", consistent with hydra
overriding style

Besides path to value is needed, modify_fn, modify_params is also needed
for telling the term how to modify.



Demo 1: difficulty-adaptive modification for all python native data type
```
# iv -> initial value, fv -> final value
def initial_final_interpolate_fn(env: ManagerBasedRLEnv, env_id, data, iv, fv, get_fraction):
    iv_, fv_ = torch.tensor(iv, device=env.device), torch.tensor(fv, device=env.device)
    fraction = eval(get_fraction)
    new_val = fraction * (fv_ - iv_) + iv_
    if isinstance(data, float):
        return new_val.item()
    elif isinstance(data, int):
        return int(new_val.item())
    elif isinstance(data, (tuple, list)):
        raw = new_val.tolist()
        # assume iv is sequence of all ints or all floats:
        is_int = isinstance(iv[0], int)
        casted = [int(x) if is_int else float(x) for x in raw]
        return tuple(casted) if isinstance(data, tuple) else casted
    else:
        raise TypeError(f"Does not support the type {type(data)}")
```
(float)
```
    joint_pos_unoise_min_adr = CurrTerm(
        func=mdp.modify_term_cfg,
        params={
            "address": "observations.policy.joint_pos.noise.n_min",
            "modify_fn": initial_final_interpolate_fn,
            "modify_params": {"iv": 0., "fv": -.1, "get_fraction": "env.command_manager.get_command("difficulty")"}
        }
    )
```

(tuple or list)
```
command_object_pose_xrange_adr = CurrTerm(
        func=mdp.modify_term_cfg,
        params={
            "address": "commands.object_pose.ranges.pos_x",
            "modify_fn": initial_final_interpolate_fn,
            "modify_params": {"iv": (-.5, -.5), "fv": (-.75, -.25), "get_fraction": "env.command_manager.get_command("difficulty")"}
        }
    )
```

Demo 3: overriding entire term on env_step counter rather than adaptive
```
def value_override(env: ManagerBasedRLEnv, env_id, data, new_val, num_steps):
    if env.common_step_counter > num_steps:
        return new_val
    return mdp.modify_term_cfg.NO_CHANGE

object_pos_curriculum = CurrTerm(
        func=mdp.modify_term_cfg,
        params={
            "address": "commands.object_pose",
            "modify_fn": value_override,
            "modify_params": {"new_val": <new_observation_term>, "num_step": 120000 }
        }
    )
```

Demo 4: overriding Tensor field within some arbitary class not visible
from term_cfg
(you can see that 'address' is not as nice as mdp.modify_term_cfg)
```
def resample_bucket_range(env: ManagerBasedRLEnv, env_id, data, static_friction_range, dynamic_friction_range, restitution_range, num_steps):
    if env.common_step_counter > num_steps:
          range_list = [static_friction_range, dynamic_friction_range, restitution_range]
          ranges = torch.tensor(range_list, device="cpu")
          new_buckets = math_utils.sample_uniform(ranges[:, 0], ranges[:, 1], (len(data), 3), device="cpu")
          return new_buckets
    return mdp.modify_env_param.NO_CHANGE

object_physics_material_curriculum = CurrTerm(
        func=mdp.modify_env_param,
        params={
            "address": "event_manager.cfg.object_physics_material.func.material_buckets",
            "modify_fn": resample_bucket_range,
            "modify_params": {"static_friction_range": [.5, 1.], "dynamic_friction_range": [.3, 1.], "restitution_range": [0.0, 0.5], "num_step": 120000 }
        }
    )
```


## Type of change

<!-- As you go through the list, delete the ones that are not
applicable. -->

- New feature (non-breaking change which adds functionality)


## Checklist

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./isaaclab.sh --format`
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [x] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there

<!--
As you go through the checklist above, you can mark something as done by
putting an x character in it

For example,
- [x] I have done this task
- [ ] I have not done this task
-->

---------
Signed-off-by: 's avatarooctipus <zhengyuz@nvidia.com>
Signed-off-by: 's avatarKelly Guo <kellyg@nvidia.com>
Co-authored-by: 's avatarKelly Guo <kellyg@nvidia.com>
parent 9df117c9
Curriculum Utilities
====================
.. currentmodule:: isaaclab.managers
This guide walks through the common curriculum helper functions and terms that can be used to create flexible curricula
for RL environments in Isaac Lab. These utilities can be passed to a :class:`~isaaclab.managers.CurriculumTermCfg`
object to enable dynamic modification of reward weights and environment parameters during training.
.. note::
We cover three utilities in this guide:
- The simple function modifies reward :func:`modify_reward_weight`
- The term modify any environment parameters :class:`modify_env_param`
- The term modify term_cfg :class:`modify_term_cfg`
.. dropdown:: Full source for curriculum utilities
:icon: code
.. literalinclude:: ../../../source/isaaclab/isaaclab/envs/mdp/curriculums.py
:language: python
Modifying Reward Weights
------------------------
The function :func:`modify_reward_weight` updates the weight of a reward term after a specified number of simulation
steps. This can be passed directly as the ``func`` in a ``CurriculumTermCfg``.
.. literalinclude:: ../../../source/isaaclab/isaaclab/envs/mdp/curriculums.py
:language: python
:pyobject: modify_reward_weight
**Usage example**:
.. code-block:: python
from isaaclab.managers import CurriculumTermCfg
import isaaclab.managers.mdp as mdp
# After 100k steps, set the "sparse_reward" term weight to 0.5
sparse_reward_schedule = CurriculumTermCfg(
func=mdp.modify_reward_weight,
params={
"term_name": "sparse_reward",
"weight": 0.5,
"num_steps": 100_000,
}
)
Dynamically Modifying Environment Parameters
--------------------------------------------
The class :class:`modify_env_param` is a :class:`~isaaclab.managers.ManagerTermBase` subclass that lets you target any
dotted attribute path in the environment and apply a user-supplied function to compute a new value at runtime. It
handles nested attributes, dictionary keys, list or tuple indexing, and respects a ``NO_CHANGE`` sentinel if no update
is desired.
.. literalinclude:: ../../../source/isaaclab/isaaclab/envs/mdp/curriculums.py
:language: python
:pyobject: modify_env_param
**Usage example**:
.. code-block:: python
import torch
from isaaclab.managers import CurriculumTermCfg
import isaaclab.managers.mdp as mdp
def resample_friction(env, env_ids, old_value, low, high, num_steps):
# After num_steps, sample a new friction coefficient uniformly
if env.common_step_counter > num_steps:
return torch.empty((len(env_ids),), device="cpu").uniform_(low, high)
return mdp.modify_env_param.NO_CHANGE
friction_curriculum = CurriculumTermCfg(
func=mdp.modify_env_param,
params={
"address": "event_manager.cfg.object_physics_material.func.material_buckets",
"modify_fn": resample_friction,
"modify_params": {
"low": 0.3,
"high": 1.0,
"num_steps": 120_000,
}
}
)
Modify Term Configuration
-------------------------
The subclass :class:`modify_term_cfg` provides a more concise style address syntax, using consistent with hydra config
syntax. It otherwise behaves identically to :class:`modify_env_param`.
.. literalinclude:: ../../../source/isaaclab/isaaclab/envs/mdp/curriculums.py
:language: python
:pyobject: modify_term_cfg
**Usage example**:
.. code-block:: python
def override_command_range(env, env_ids, old_value, value, num_steps):
# Override after num_steps
if env.common_step_counter > num_steps:
return value
return mdp.modify_term_cfg.NO_CHANGE
range_override = CurriculumTermCfg(
func=mdp.modify_term_cfg,
params={
"address": "commands.object_pose.ranges.pos_x",
"modify_fn": override_command_range,
"modify_params": {
"value": (-0.75, -0.25),
"num_steps": 12_000,
}
}
)
......@@ -112,6 +112,19 @@ This guide explains how to record an animation and video in Isaac Lab.
record_animation
record_video
Dynamically Modifying Environment Parameters With CurriculumTerm
----------------------------------------------------------------
This guide explains how to dynamically modify environment parameters during training in Isaac Lab.
It covers the use of curriculum utilities to change environment parameters at runtime.
.. toctree::
:maxdepth: 1
curriculums
Mastering Omniverse
-------------------
......
[package]
# Note: Semantic Versioning is used: https://semver.org/
version = "0.40.20"
version = "0.40.21"
# Description
title = "Isaac Lab framework for Robot Learning"
......
Changelog
---------
0.40.21 (2025-06-25)
~~~~~~~~~~~~~~~~~~~~
Added
^^^^^
* Added new curriculum mdp :func:`~isaaclab.envs.mdp.curriculums.modify_env_param` and
:func:`~isaaclab.envs.mdp.curriculums.modify_env_param` that enables flexible changes to any configurations in the
env instance
0.40.20 (2025-07-11)
~~~~~~~~~~~~~~~~~~~~
......@@ -178,6 +189,7 @@ Changed
* Renamed :func:`~isaaclab.utils.noise.NoiseModel.apply` method to :func:`~isaaclab.utils.noise.NoiseModel.__call__`.
0.40.6 (2025-06-12)
~~~~~~~~~~~~~~~~~~~
......
......@@ -11,9 +11,12 @@ the curriculum introduced by the function.
from __future__ import annotations
import re
from collections.abc import Sequence
from typing import TYPE_CHECKING
from isaaclab.managers import ManagerTermBase
if TYPE_CHECKING:
from isaaclab.envs import ManagerBasedRLEnv
......@@ -34,3 +37,189 @@ def modify_reward_weight(env: ManagerBasedRLEnv, env_ids: Sequence[int], term_na
# update term settings
term_cfg.weight = weight
env.reward_manager.set_term_cfg(term_name, term_cfg)
class modify_env_param(ManagerTermBase):
"""Curriculum term for dynamically modifying a single environment parameter at runtime.
This term compiles getter/setter accessors for a target attribute (specified by
`cfg.params["address"]`) the first time it is called, then on each invocation
reads the current value, applies a user-provided `modify_fn`, and writes back
the result. Since None in this case can sometime be desirable value to write, we
use token, NO_CHANGE, as non-modification signal to this class, see usage below.
Usage:
.. code-block:: python
def resample_bucket_range(
env, env_id, data, static_friction_range, dynamic_friction_range, restitution_range, num_steps
):
if env.common_step_counter > num_steps:
range_list = [static_friction_range, dynamic_friction_range, restitution_range]
ranges = torch.tensor(range_list, device="cpu")
new_buckets = math_utils.sample_uniform(ranges[:, 0], ranges[:, 1], (len(data), 3), device="cpu")
return new_buckets
return mdp.modify_env_param.NO_CHANGE
object_physics_material_curriculum = CurrTerm(
func=mdp.modify_env_param,
params={
"address": "event_manager.cfg.object_physics_material.func.material_buckets",
"modify_fn": resample_bucket_range,
"modify_params": {
"static_friction_range": [.5, 1.],
"dynamic_friction_range": [.3, 1.],
"restitution_range": [0.0, 0.5],
"num_step": 120000
}
}
)
"""
NO_CHANGE = object()
def __init__(self, cfg, env):
"""
Initialize the ModifyEnvParam term.
Args:
cfg: A CurriculumTermCfg whose `params` dict must contain:
- "address" (str): dotted path into the env where the parameter lives.
env: The ManagerBasedRLEnv instance this term will act upon.
"""
super().__init__(cfg, env)
self._INDEX_RE = re.compile(r"^(\w+)\[(\d+)\]$")
self.get_fn: callable = None
self.set_fn: callable = None
self.address: str = self.cfg.params.get("address")
def __call__(
self,
env: ManagerBasedRLEnv,
env_ids: Sequence[int],
address: str,
modify_fn: callable,
modify_params: dict = {},
):
"""
Apply one curriculum step to the target parameter.
On the first call, compiles and caches the getter and setter accessors.
Then, retrieves the current value, passes it through `modify_fn`, and
writes back the new value.
Args:
env: The learning environment.
env_ids: Sub-environment indices (unused by default).
address: dotted path of the value retrieved from env.
modify_fn: Function signature `fn(env, env_ids, old_value, **modify_params) -> new_value`.
modify_params: Extra keyword arguments for `modify_fn`.
"""
if not self.get_fn:
self.get_fn, self.set_fn = self._compile_accessors(self._env, self.address)
data = self.get_fn()
new_val = modify_fn(self._env, env_ids, data, **modify_params)
if new_val is not self.NO_CHANGE: # if the modify_fn return NO_CHANGE signal, do not invoke self.set_fn
self.set_fn(new_val)
def _compile_accessors(self, root, path: str):
"""
Build and return (getter, setter) functions for a dotted attribute path.
Supports nested attributes, dict keys, and sequence indexing via "name[idx]".
Args:
root: Base object (usually `self._env`) from which to resolve `path`.
path: Dotted path string, e.g. "foo.bar[2].baz".
Returns:
tuple:
- getter: () -> current value
- setter: (new_value) -> None (writes new_value back into the object)
"""
# Turn "a.b[2].c" into ["a", ("b",2), "c"] and store in parts
parts = []
for part in path.split("."):
m = self._INDEX_RE.match(part)
if m:
parts.append((m.group(1), int(m.group(2))))
else:
parts.append(part)
cur = root
for p in parts[:-1]:
if isinstance(p, tuple):
name, idx = p
container = cur[name] if isinstance(cur, dict) else getattr(cur, name)
cur = container[idx]
else:
cur = cur[p] if isinstance(cur, dict) else getattr(cur, p)
self.container = cur
self.last = parts[-1]
# build the getter and setter
if isinstance(self.container, tuple):
getter = lambda: self.container[self.last] # noqa: E731
def setter(val):
tuple_list = list(self.container)
tuple_list[self.last] = val
self.container = tuple(tuple_list)
elif isinstance(self.container, (list, dict)):
getter = lambda: self.container[self.last] # noqa: E731
def setter(val):
self.container[self.last] = val
elif isinstance(self.container, object):
getter = lambda: getattr(self.container, self.last) # noqa: E731
def setter(val):
setattr(self.container, self.last, val)
else:
raise TypeError(f"getter does not recognize the type {type(self.container)}")
return getter, setter
class modify_term_cfg(modify_env_param):
"""Subclass of ModifyEnvParam that maps a simplified 's.'-style address
to the full manager path. This is a more natural style for writing configurations
Reads `cfg.params["address"]`, replaces only the first occurrence of "s."
with "_manager.cfg.", and then behaves identically to ModifyEnvParam.
for example: command_manager.cfg.object_pose.ranges.xpos -> commands.object_pose.ranges.xpos
Usage:
.. code-block:: python
def override_value(env, env_ids, data, value, num_steps):
if env.common_step_counter > num_steps:
return value
return mdp.modify_term_cfg.NO_CHANGE
command_object_pose_xrange_adr = CurrTerm(
func=mdp.modify_term_cfg,
params={
"address": "commands.object_pose.ranges.pos_x", # note that `_manager.cfg` is omitted
"modify_fn": override_value,
"modify_params": {"value": (-.75, -.25), "num_steps": 12000}
}
)
"""
def __init__(self, cfg, env):
"""
Initialize the ModifyTermCfg term.
Args:
cfg: A CurriculumTermCfg whose `params["address"]` is a simplified
path using "s." as separator, e.g. instead of write "observation_manager.cfg", writes "observations".
env: The ManagerBasedRLEnv instance this term will act upon.
"""
super().__init__(cfg, env)
input_address: str = self.cfg.params.get("address")
self.address = input_address.replace("s.", "_manager.cfg.", 1)
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Test texture randomization in the cartpole scene using pytest."""
from isaaclab.app import AppLauncher
# launch omniverse app
simulation_app = AppLauncher(headless=True).app
import torch
import omni.usd
import pytest
import isaaclab.envs.mdp as mdp
from isaaclab.assets import Articulation
from isaaclab.envs import ManagerBasedRLEnv
from isaaclab.managers import CurriculumTermCfg as CurrTerm
from isaaclab.utils import configclass
from isaaclab_tasks.manager_based.classic.cartpole.cartpole_env_cfg import CartpoleEnvCfg
def replace_value(env, env_id, data, value, num_steps):
if env.common_step_counter > num_steps and data != value:
return value
# use the sentinel to indicate “no change”
return mdp.modify_env_param.NO_CHANGE
@configclass
class CurriculumsCfg:
modify_observation_joint_pos = CurrTerm(
# test writing a term's func.
func=mdp.modify_term_cfg,
params={
"address": "observations.policy.joint_pos_rel.func",
"modify_fn": replace_value,
"modify_params": {"value": mdp.joint_pos, "num_steps": 1},
},
)
# test writing a term's param that involves dictionary.
modify_reset_joint_pos = CurrTerm(
func=mdp.modify_term_cfg,
params={
"address": "events.reset_pole_position.params.position_range",
"modify_fn": replace_value,
"modify_params": {"value": (-0.0, 0.0), "num_steps": 1},
},
)
# test writing a non_term env parameter using modify_env_param.
modify_episode_max_length = CurrTerm(
func=mdp.modify_env_param,
params={
"address": "cfg.episode_length_s",
"modify_fn": replace_value,
"modify_params": {"value": 20, "num_steps": 1},
},
)
@pytest.mark.parametrize("device", ["cpu", "cuda"])
def test_curriculum_modify_env_param(device):
"""Ensure curriculum terms apply correctly after the fallback and replacement."""
# new USD stage
omni.usd.get_context().new_stage()
# configure the cartpole env
env_cfg = CartpoleEnvCfg()
env_cfg.scene.num_envs = 16
env_cfg.curriculum = CurriculumsCfg()
env_cfg.sim.device = device
env = ManagerBasedRLEnv(cfg=env_cfg)
robot: Articulation = env.scene["robot"]
# run a few steps under inference mode
with torch.inference_mode():
for count in range(3):
env.reset()
actions = torch.randn_like(env.action_manager.action)
if count == 0:
# test before curriculum kicks in, value agrees with default configuration
joint_ids = env.event_manager.cfg.reset_cart_position.params["asset_cfg"].joint_ids
assert env.observation_manager.cfg.policy.joint_pos_rel.func == mdp.joint_pos_rel
assert torch.any(robot.data.joint_pos[:, joint_ids] != 0.0)
assert env.max_episode_length_s == env_cfg.episode_length_s
if count == 2:
# test after curriculum makes effect, value agrees with new values
assert env.observation_manager.cfg.policy.joint_pos_rel.func == mdp.joint_pos
joint_ids = env.event_manager.cfg.reset_cart_position.params["asset_cfg"].joint_ids
assert torch.all(robot.data.joint_pos[:, joint_ids] == 0.0)
assert env.max_episode_length_s == 20
env.step(actions)
env.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment