Unverified Commit fecf239c authored by jtigue-bdai's avatar jtigue-bdai Committed by GitHub

Adds configurable modifiers to observation manager (#830)

# Description

The goal of this PR is to introduce expanded functionality to
observation corruption/realism. This is done by introducing the concept
of configurable modifiers. This creates an architecture that can apply a
list of custom modifiers to observation data giving flexibility to order
of operations. Existing modifiers for clip, scale, bias, and noise can
be used. This does so without changing the current functionality. This
PR also includes two new modifiers that are stateful (i.e. output is
dependent on previous state) that utilize a Callable class. They are a
`DigitalFilter` and an `Integrator`.

Summary of Additions:
- Adds `ModifierCfg` and `Modifier`
- Adds example modifiers functions for bias, scale, clip, noise
- Adds example modifiers classes: DigitalFilter, Integrator
- Adds functionality to `ObservationManager` to prepare and handle
modifiers of observation data
- Adds unit tests for modifiers in test_modifiers.py
- Adds test to test_observation_manager.py to test integration with
modifiers

## Type of change

- New feature (non-breaking change which adds functionality)
- This change requires a documentation update

## Checklist

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./isaaclab.sh --format`
- [x] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [x] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there

---------
Signed-off-by: 's avatarjtigue-bdai <166445701+jtigue-bdai@users.noreply.github.com>
Co-authored-by: 's avatarMayank Mittal <mittalma@leggedrobotics.com>
parent 36c58ea0
......@@ -14,6 +14,7 @@
dict
interpolation
math
modifiers
noise
string
timer
......@@ -87,6 +88,17 @@ Math operations
:inherited-members:
:show-inheritance:
Modifier operations
~~~~~~~~~~~~~~~~~~~
.. automodule:: omni.isaac.lab.utils.modifiers
:members:
:imported-members:
:special-members: __call__
:inherited-members:
:show-inheritance:
:exclude-members: __init__, func
Noise operations
~~~~~~~~~~~~~~~~
......
......@@ -98,6 +98,7 @@ The argument ``--cpu`` has been removed in favor of ``--device device_name``. Va
- ``cpu``: Use CPU.
- ``cuda``: Use GPU with device ID ``0``.
- ``cuda:N``: Use GPU, where N is the device ID. For example, ``cuda:0``.
The default value is ``cuda:0``.
......
[package]
# Note: Semantic Versioning is used: https://semver.org/
version = "0.21.2"
version = "0.22.0"
# Description
title = "Isaac Lab framework for Robot Learning"
......
Changelog
---------
0.22.0 (2024-08-14)
~~~~~~~~~~~~~~~~~~~
Added
^^^^^^^
* Added :mod:`~omni.isaac.lab.utils.modifiers` module to provide framework for configurable and custom
observation data modifiers.
* Adapted the :class:`~omni.isaac.lab.managers.ObservationManager` class to support custom modifiers.
These are applied to the observation data before applying any noise or scaling operations.
0.21.2 (2024-08-13)
~~~~~~~~~~~~~~~~~~~
......
......@@ -172,11 +172,11 @@ class DirectRLEnv(gym.Env):
# setup noise cfg for adding action and observation noise
if self.cfg.action_noise_model:
self._action_noise_model: NoiseModel = self.cfg.action_noise_model.class_type(
self.num_envs, self.cfg.action_noise_model, self.device
self.cfg.action_noise_model, num_envs=self.num_envs, device=self.device
)
if self.cfg.observation_noise_model:
self._observation_noise_model: NoiseModel = self.cfg.observation_noise_model.class_type(
self.num_envs, self.cfg.observation_noise_model, self.device
self.cfg.observation_noise_model, num_envs=self.num_envs, device=self.device
)
# perform events at the start of the simulation
......
......@@ -258,6 +258,7 @@ class ManagerBase(ABC):
# get the corresponding function or functional class
if isinstance(term_cfg.func, str):
term_cfg.func = string_to_callable(term_cfg.func)
# initialize the term if it is a class
if inspect.isclass(term_cfg.func):
if not issubclass(term_cfg.func, ManagerTermBase):
......@@ -269,6 +270,7 @@ class ManagerBase(ABC):
# check if function is callable
if not callable(term_cfg.func):
raise AttributeError(f"The term '{term_name}' is not callable. Received: {term_cfg.func}")
# check if term's arguments are matched by params
term_params = list(term_cfg.params.keys())
args = inspect.signature(term_cfg.func).parameters
......
......@@ -13,6 +13,7 @@ from dataclasses import MISSING
from typing import TYPE_CHECKING, Any
from omni.isaac.lab.utils import configclass
from omni.isaac.lab.utils.modifiers import ModifierCfg
from omni.isaac.lab.utils.noise import NoiseCfg
from .scene_entity_cfg import SceneEntityCfg
......@@ -133,6 +134,17 @@ class ObservationTermCfg(ManagerTermBaseCfg):
shape (num_envs, obs_term_dim).
"""
modifiers: list[ModifierCfg] | None = None
"""The list of data modifiers to apply to the observation in order. Defaults to None,
in which case no modifications will be applied.
Modifiers are applied in the order they are specified in the list. They can be stateless
or stateful, and can be used to apply transformations to the observation data. For example,
a modifier can be used to normalize the observation data or to apply a rolling average.
For more information on modifiers, see the :class:`~omni.isaac.lab.utils.modifiers.ModifierCfg` class.
"""
noise: NoiseCfg | None = None
"""The noise to add to the observation. Defaults to None, in which case no noise is added."""
......
......@@ -7,11 +7,14 @@
from __future__ import annotations
import inspect
import torch
from collections.abc import Sequence
from prettytable import PrettyTable
from typing import TYPE_CHECKING
from omni.isaac.lab.utils import modifiers
from .manager_base import ManagerBase, ManagerTermBase
from .manager_term_cfg import ObservationGroupCfg, ObservationTermCfg
......@@ -45,7 +48,9 @@ class ObservationManager(ManagerBase):
The observation manager can be used to compute observations for all the groups or for a specific group. The
observations are computed by calling the registered functions for each term in the group. The functions are
called in the order of the terms in the group. The functions are expected to return a tensor with shape
(num_envs, ...). If a corruption/noise model is registered for a term, the function is called to corrupt
(num_envs, ...).
If a noise model or custom modifier is registered for a term, the function is called to corrupt
the observation. The corruption function is expected to return a tensor with the same shape as the observation.
The observations are clipped and scaled as per the configuration settings.
"""
......@@ -166,6 +171,9 @@ class ObservationManager(ManagerBase):
for group_cfg in self._group_obs_class_term_cfgs.values():
for term_cfg in group_cfg:
term_cfg.func.reset(env_ids=env_ids)
# call all modifiers that are classes
for mod in self._group_obs_class_modifiers:
mod.reset(env_ids=env_ids)
# nothing to log here
return {}
......@@ -195,13 +203,18 @@ class ObservationManager(ManagerBase):
term in the group. The functions are called in the order of the terms in the group. The functions
are expected to return a tensor with shape (num_envs, ...).
If a corruption/noise model is registered for a term, the function is called to corrupt
the observation. The corruption function is expected to return a tensor with the same
shape as the observation. The observations are clipped and scaled as per the configuration
settings.
The following steps are performed for each observation term:
1. Compute observation term by calling the function
2. Apply custom modifiers in the order specified in :attr:`ObservationTermCfg.modifiers`
3. Apply corruption/noise model based on :attr:`ObservationTermCfg.noise`
4. Apply clipping based on :attr:`ObservationTermCfg.clip`
5. Apply scaling based on :attr:`ObservationTermCfg.scale`
The operations are performed in the order: compute, add corruption/noise, clip, scale.
By default, no scaling or clipping is applied.
We apply noise to the computed term first to maintain the integrity of how noise affects the data
as it truly exists in the real world. If the noise is applied after clipping or scaling, the noise
could be artificially constrained or amplified, which might misrepresent how noise naturally occurs
in the data.
Args:
group_name: The name of the group for which to compute the observations. Defaults to None,
......@@ -227,21 +240,24 @@ class ObservationManager(ManagerBase):
group_obs = dict.fromkeys(group_term_names, None)
# read attributes for each term
obs_terms = zip(group_term_names, self._group_obs_term_cfgs[group_name])
# evaluate terms: compute, add noise, clip, scale.
# evaluate terms: compute, add noise, clip, scale, custom modifiers
for name, term_cfg in obs_terms:
# compute term's value
obs: torch.Tensor = term_cfg.func(self._env, **term_cfg.params).clone()
# apply post-processing
if term_cfg.modifiers is not None:
for modifier in term_cfg.modifiers:
obs = modifier.func(obs, **modifier.params)
if term_cfg.noise:
obs = term_cfg.noise.func(obs, term_cfg.noise)
if term_cfg.clip:
obs = obs.clip_(min=term_cfg.clip[0], max=term_cfg.clip[1])
if term_cfg.scale:
obs = obs.mul_(term_cfg.scale)
# TODO: Introduce delay and filtering models.
# Ref: https://robosuite.ai/docs/modules/sensors.html#observables
# add value to list
group_obs[name] = obs
# concatenate all observations in the group together
if self._group_obs_concatenate[group_name]:
return torch.cat(list(group_obs.values()), dim=-1)
......@@ -262,6 +278,10 @@ class ObservationManager(ManagerBase):
self._group_obs_class_term_cfgs: dict[str, list[ObservationTermCfg]] = dict()
self._group_obs_concatenate: dict[str, bool] = dict()
# create a list to store modifiers that are classes
# we store it as a separate list to only call reset on them and prevent unnecessary calls
self._group_obs_class_modifiers: list[modifiers.ModifierBase] = list()
# check if config is dict already
if isinstance(self.cfg, dict):
group_cfg_items = self.cfg.items()
......@@ -285,7 +305,6 @@ class ObservationManager(ManagerBase):
self._group_obs_class_term_cfgs[group_name] = list()
# read common config for the group
self._group_obs_concatenate[group_name] = group_cfg.concatenate_terms
# check if config is dict already
if isinstance(group_cfg, dict):
group_cfg_items = group_cfg.items()
......@@ -306,15 +325,64 @@ class ObservationManager(ManagerBase):
)
# resolve common terms in the config
self._resolve_common_term_cfg(f"{group_name}/{term_name}", term_cfg, min_argc=1)
# check noise settings
if not group_cfg.enable_corruption:
term_cfg.noise = None
# add term config to list to list
self._group_obs_term_names[group_name].append(term_name)
self._group_obs_term_cfgs[group_name].append(term_cfg)
# call function the first time to fill up dimensions
obs_dims = tuple(term_cfg.func(self._env, **term_cfg.params).shape[1:])
self._group_obs_term_dim[group_name].append(obs_dims)
obs_dims = tuple(term_cfg.func(self._env, **term_cfg.params).shape)
self._group_obs_term_dim[group_name].append(obs_dims[1:])
# prepare modifiers for each observation
if term_cfg.modifiers is not None:
# initialize list of modifiers for term
for mod_cfg in term_cfg.modifiers:
# check if class modifier and initialize with observation size when adding
if isinstance(mod_cfg, modifiers.ModifierCfg):
# to list of modifiers
if inspect.isclass(mod_cfg.func):
if not issubclass(mod_cfg.func, modifiers.ModifierBase):
raise TypeError(
f"Modifier function '{mod_cfg.func}' for observation term '{term_name}'"
f" is not a subclass of 'ModifierBase'. Received: '{type(mod_cfg.func)}'."
)
mod_cfg.func = mod_cfg.func(cfg=mod_cfg, data_dim=obs_dims, device=self._env.device)
# add to list of class modifiers
self._group_obs_class_modifiers.append(mod_cfg.func)
else:
raise TypeError(
f"Modifier configuration '{mod_cfg}' of observation term '{term_name}' is not of"
f" required type ModifierCfg, Received: '{type(mod_cfg)}'"
)
# check if function is callable
if not callable(mod_cfg.func):
raise AttributeError(
f"Modifier '{mod_cfg}' of observation term '{term_name}' is not callable."
f" Received: {mod_cfg.func}"
)
# check if term's arguments are matched by params
term_params = list(mod_cfg.params.keys())
args = inspect.signature(mod_cfg.func).parameters
args_with_defaults = [arg for arg in args if args[arg].default is not inspect.Parameter.empty]
args_without_defaults = [arg for arg in args if args[arg].default is inspect.Parameter.empty]
args = args_without_defaults + args_with_defaults
# ignore first two arguments for env and env_ids
# Think: Check for cases when kwargs are set inside the function?
if len(args) > 1:
if set(args[1:]) != set(term_params + args_with_defaults):
raise ValueError(
f"Modifier '{mod_cfg}' of observation term '{term_name}' expects"
f" mandatory parameters: {args_without_defaults[1:]}"
f" and optional parameters: {args_with_defaults}, but received: {term_params}."
)
# add term in a separate list if term is a class
if isinstance(term_cfg.func, ManagerTermBase):
self._group_obs_class_term_cfgs[group_name].append(term_cfg)
......
......@@ -10,5 +10,6 @@ from .buffers import *
from .configclass import configclass
from .dict import *
from .interpolation import *
from .modifiers import *
from .string import *
from .timer import Timer
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Sub-module containing different modifiers implementations.
Modifiers are used to apply stateful or stateless modifications to tensor data. They take
in a tensor and a configuration and return a tensor with the modification applied. This way users
can define custom operations to apply to a tensor. For instance, a modifier can be used to normalize
the input data or to apply a rolling average.
They are primarily used to apply custom operations in the :class:`~omni.isaac.lab.managers.ObservationManager`
as an alternative to the built-in noise, clip and scale post-processing operations. For more details, see
the :class:`~omni.isaac.lab.managers.ObservationTermCfg` class.
Usage with a function modifier:
.. code-block:: python
import torch
from omni.isaac.lab.utils import modifiers
# create a random tensor
my_tensor = torch.rand(256, 128, device="cuda")
# create a modifier configuration
cfg = modifiers.ModifierCfg(func=modifiers.clip, params={"bounds": (0.0, torch.inf)})
# apply the modifier
my_modified_tensor = cfg.func(my_tensor, cfg)
Usage with a class modifier:
.. code-block:: python
import torch
from omni.isaac.lab.utils import modifiers
# create a random tensor
my_tensor = torch.rand(256, 128, device="cuda")
# create a modifier configuration
# a digital filter with a simple delay of 1 timestep
cfg = modifiers.DigitalFilter(A=[0.0], B=[0.0, 1.0])
# create the modifier instance
my_modifier = modifiers.DigitalFilter(cfg, my_tensor.shape, "cuda")
# apply the modifier as a callable object
my_modified_tensor = my_modifier(my_tensor)
"""
# isort: off
from .modifier_cfg import ModifierCfg
from .modifier_base import ModifierBase
from .modifier import DigitalFilter
from .modifier_cfg import DigitalFilterCfg
from .modifier import Integrator
from .modifier_cfg import IntegratorCfg
# isort: on
from .modifier import bias, clip, scale
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import torch
from collections.abc import Sequence
from typing import TYPE_CHECKING
from .modifier_base import ModifierBase
if TYPE_CHECKING:
from . import modifier_cfg
##
# Modifiers as functions
##
def scale(data: torch.Tensor, multiplier: float) -> torch.Tensor:
"""Scales input data by a multiplier.
Args:
data: The data to apply the scale to.
multiplier: Value to scale input by.
Returns:
Scaled data. Shape is the same as data.
"""
return data * multiplier
def clip(data: torch.Tensor, bounds: tuple[float | None, float | None]) -> torch.Tensor:
"""Clips the data to a minimum and maximum value.
Args:
data: The data to apply the clip to.
bounds: A tuple containing the minimum and maximum values to clip data to.
If the value is None, that bound is not applied.
Returns:
Clipped data. Shape is the same as data.
"""
return data.clip(min=bounds[0], max=bounds[1])
def bias(data: torch.Tensor, value: float) -> torch.Tensor:
"""Adds a uniform bias to the data.
Args:
data: The data to add bias to.
value: Value of bias to add to data.
Returns:
Biased data. Shape is the same as data.
"""
return data + value
##
# Sample of class based modifiers
##
class DigitalFilter(ModifierBase):
r"""Modifier used to apply digital filtering to the input data.
`Digital filters <https://en.wikipedia.org/wiki/Digital_filter>`_ are used to process discrete-time
signals to extract useful parts of the signal, such as smoothing, noise reduction, or frequency separation.
The filter can be implemented as a linear difference equation in the time domain. This equation
can be used to calculate the output at each time-step based on the current and previous inputs and outputs.
.. math::
y_{i} = X B - Y A = \sum_{j=0}^{N} b_j x_{i-j} - \sum_{j=1}^{M} a_j y_{i-j}
where :math:`y_{i}` is the current output of the filter. The array :math:`Y` contains previous
outputs from the filter :math:`\{y_{i-j}\}_{j=1}^M` for :math:`M` previous time-steps. The array
:math:`X` contains current :math:`x_{i}` and previous inputs to the filter
:math:`\{x_{i-j}\}_{j=1}^N` for :math:`N` previous time-steps respectively.
The filter coefficients :math:`A` and :math:`B` are used to design the filter. They are column vectors of
length :math:`M` and :math:`N + 1` respectively.
Different types of filters can be implemented by choosing different values for :math:`A` and :math:`B`.
We provide some examples below.
Examples
^^^^^^^^
**Unit Delay Filter**
A filter that delays the input signal by a single time-step simply outputs the previous input value.
.. math:: y_{i} = x_{i-1}
This can be implemented as a digital filter with the coefficients :math:`A = [0.0]` and :math:`B = [0.0, 1.0]`.
**Moving Average Filter**
A moving average filter is used to smooth out noise in a signal. It is similar to a low-pass filter
but has a finite impulse response (FIR) and is non-recursive.
The filter calculates the average of the input signal over a window of time-steps. The linear difference
equation for a moving average filter is:
.. math:: y_{i} = \frac{1}{N} \sum_{j=0}^{N} x_{i-j}
This can be implemented as a digital filter with the coefficients :math:`A = [0.0]` and
:math:`B = [1/N, 1/N, \cdots, 1/N]`.
**First-order recursive low-pass filter**
A recursive low-pass filter is used to smooth out high-frequency noise in a signal. It is a first-order
infinite impulse response (IIR) filter which means it has a recursive component (previous output) in the
linear difference equation.
A first-order low-pass IIR filter has the difference equation:
.. math:: y_{i} = \alpha y_{i-1} + (1-\alpha)x_{i}
where :math:`\alpha` is a smoothing parameter between 0 and 1. Typically, the value of :math:`\alpha` is
chosen based on the desired cut-off frequency of the filter.
This filter can be implemented as a digital filter with the coefficients :math:`A = [\alpha]` and
:math:`B = [1 - \alpha]`.
"""
def __init__(self, cfg: modifier_cfg.DigitalFilterCfg, data_dim: tuple[int, ...], device: str) -> None:
"""Initializes digital filter.
Args:
cfg: Configuration parameters.
data_dim: The dimensions of the data to be modified. First element is the batch size
which usually corresponds to number of environments in the simulation.
device: The device to run the modifier on.
Raises:
ValueError: If filter coefficients are None.
"""
# check that filter coefficients are not None
if cfg.A is None or cfg.B is None:
raise ValueError("Digital filter coefficients A and B must not be None. Please provide valid coefficients.")
# initialize parent class
super().__init__(cfg, data_dim, device)
# assign filter coefficients and make sure they are column vectors
self.A = torch.tensor(self._cfg.A, device=self._device).unsqueeze(1)
self.B = torch.tensor(self._cfg.B, device=self._device).unsqueeze(1)
# create buffer for input and output history
self.x_n = torch.zeros(self._data_dim + (self.B.shape[0],), device=self._device)
self.y_n = torch.zeros(self._data_dim + (self.A.shape[0],), device=self._device)
def reset(self, env_ids: Sequence[int] | None = None):
"""Resets digital filter history.
Args:
env_ids: The environment ids. Defaults to None, in which case
all environments are considered.
"""
if env_ids is None:
env_ids = slice(None)
# reset history buffers
self.x_n[env_ids] = 0.0
self.y_n[env_ids] = 0.0
def __call__(self, data: torch.Tensor) -> torch.Tensor:
"""Applies digital filter modification with a rolling history window inputs and outputs.
Args:
data: The data to apply filter to.
Returns:
Filtered data. Shape is the same as data.
"""
# move history window for input
self.x_n = torch.roll(self.x_n, shifts=1, dims=-1)
self.x_n[..., 0] = data
# calculate current filter value: y[i] = Y*A - X*B
y_i = torch.matmul(self.x_n, self.B) - torch.matmul(self.y_n, self.A)
y_i.squeeze_(-1)
# move history window for output and add current filter value to history
self.y_n = torch.roll(self.y_n, shifts=1, dims=-1)
self.y_n[..., 0] = y_i
return y_i
class Integrator(ModifierBase):
r"""Modifier that applies a numerical forward integration based on a middle Reimann sum.
An integrator is used to calculate the integral of a signal over time. The integral of a signal
is the area under the curve of the signal. The integral can be approximated using numerical methods
such as the `Riemann sum <https://en.wikipedia.org/wiki/Riemann_sum>`_.
The middle Riemann sum is a method to approximate the integral of a function by dividing the area
under the curve into rectangles. The height of each rectangle is the value of the function at the
midpoint of the interval. The area of each rectangle is the width of the interval multiplied by the
height of the rectangle.
This integral method is useful for signals that are sampled at regular intervals. The integral
can be written as:
.. math::
\int_{t_0}^{t_n} f(t) dt & \approx \int_{t_0}^{t_{n-1}} f(t) dt + \frac{f(t_{n-1}) + f(t_n)}{2} \Delta t
where :math:`f(t)` is the signal to integrate, :math:`t_i` is the time at the i-th sample, and
:math:`\Delta t` is the time step between samples.
"""
def __init__(self, cfg: modifier_cfg.IntegratorCfg, data_dim: tuple[int, ...], device: str):
"""Initializes the integrator configuration and state.
Args:
cfg: Integral parameters.
data_dim: The dimensions of the data to be modified. First element is the batch size
which usually corresponds to number of environments in the simulation.
device: The device to run the modifier on.
"""
# initialize parent class
super().__init__(cfg, data_dim, device)
# assign buffer for integral and previous value
self.integral = torch.zeros(self._data_dim, device=self._device)
self.y_prev = torch.zeros(self._data_dim, device=self._device)
def reset(self, env_ids: Sequence[int] | None = None):
"""Resets integrator state to zero.
Args:
env_ids: The environment ids. Defaults to None, in which case
all environments are considered.
"""
if env_ids is None:
env_ids = slice(None)
# reset history buffers
self.integral[env_ids] = 0.0
self.y_prev[env_ids] = 0.0
def __call__(self, data: torch.Tensor) -> torch.Tensor:
"""Applies integral modification to input data.
Args:
data: The data to integrate.
Returns:
Integral of input signal. Shape is the same as data.
"""
# integrate using middle Riemann sum
self.integral += (data + self.y_prev) / 2 * self._cfg.dt
# update previous value
self.y_prev[:] = data
return self.integral
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import torch
from abc import ABC, abstractmethod
from collections.abc import Sequence
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .modifier_cfg import ModifierCfg
class ModifierBase(ABC):
"""Base class for modifiers implemented as classes.
Modifiers implementations can be functions or classes. If a modifier is a class, it should
inherit from this class and implement the required methods.
A class implementation of a modifier can be used to store state information between calls.
This is useful for modifiers that require stateful operations, such as rolling averages
or delays or decaying filters.
Example pseudo-code to create and use the class:
.. code-block:: python
from omni.isaac.lab.utils import modifiers
# define custom keyword arguments to pass to ModifierCfg
kwarg_dict = {"arg_1" : VAL_1, "arg_2" : VAL_2}
# create modifier configuration object
# func is the class name of the modifier and params is the dictionary of arguments
modifier_config = modifiers.ModifierCfg(func=modifiers.ModifierBase, params=kwarg_dict)
# define modifier instance
my_modifier = modifiers.ModifierBase(cfg=modifier_config)
"""
def __init__(self, cfg: ModifierCfg, data_dim: tuple[int, ...], device: str) -> None:
"""Initializes the modifier class.
Args:
cfg: Configuration parameters.
data_dim: The dimensions of the data to be modified. First element is the batch size
which usually corresponds to number of environments in the simulation.
device: The device to run the modifier on.
"""
self._cfg = cfg
self._data_dim = data_dim
self._device = device
@abstractmethod
def reset(self, env_ids: Sequence[int] | None = None):
"""Resets the Modifier.
Args:
env_ids: The environment ids. Defaults to None, in which case
all environments are considered.
"""
raise NotImplementedError
@abstractmethod
def __call__(self, data: torch.Tensor) -> torch.Tensor:
"""Abstract method for defining the modification function.
Args:
data: The data to be modified. Shape should match the data_dim passed during initialization.
Returns:
Modified data. Shape is the same as the input data.
"""
raise NotImplementedError
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import torch
from collections.abc import Callable
from dataclasses import MISSING
from typing import Any
from omni.isaac.lab.utils import configclass
from . import modifier
@configclass
class ModifierCfg:
"""Configuration parameters modifiers"""
func: Callable[..., torch.Tensor] = MISSING
"""Function or callable class used by modifier.
The function must take a torch tensor as the first argument. The remaining arguments are specified
in the :attr:`params` attribute.
It also supports `callable classes <https://docs.python.org/3/reference/datamodel.html#object.__call__>`_,
i.e. classes that implement the ``__call__()`` method. In this case, the class should inherit from the
:class:`ModifierBase` class and implement the required methods.
"""
params: dict[str, Any] = dict()
"""The parameters to be passed to the function or callable class as keyword arguments. Defaults to
an empty dictionary."""
@configclass
class DigitalFilterCfg(ModifierCfg):
"""Configuration parameters for a digital filter modifier.
For more information, please check the :class:`DigitalFilter` class.
"""
func: type[modifier.DigitalFilter] = modifier.DigitalFilter
"""The digital filter function to be called for applying the filter."""
A: list[float] = MISSING
"""The coefficients corresponding the the filter's response to past outputs.
These correspond to the weights of the past outputs of the filter. The first element is the coefficient
for the output at the previous time step, the second element is the coefficient for the output at two
time steps ago, and so on.
It is the denominator coefficients of the transfer function of the filter.
"""
B: list[float] = MISSING
"""The coefficients corresponding the the filter's response to current and past inputs.
These correspond to the weights of the current and past inputs of the filter. The first element is the
coefficient for the current input, the second element is the coefficient for the input at the previous
time step, and so on.
It is the numerator coefficients of the transfer function of the filter.
"""
@configclass
class IntegratorCfg(ModifierCfg):
"""Configuration parameters for an integrator modifier.
For more information, please check the :class:`Integrator` class.
"""
func: type[modifier.Integrator] = modifier.Integrator
"""The integrator function to be called for applying the integrator."""
dt: float = MISSING
"""The time step of the integrator."""
......@@ -26,14 +26,10 @@ Usage:
"""
from .noise_cfg import NoiseCfg # noqa: F401
from .noise_cfg import (
AdditiveGaussianNoiseCfg,
AdditiveUniformNoiseCfg,
ConstantBiasNoiseCfg,
ConstantNoiseCfg,
GaussianNoiseCfg,
NoiseModelCfg,
NoiseModelWithAdditiveBiasCfg,
UniformNoiseCfg,
)
from .noise_cfg import ConstantNoiseCfg, GaussianNoiseCfg, NoiseModelCfg, NoiseModelWithAdditiveBiasCfg, UniformNoiseCfg
from .noise_model import NoiseModel, NoiseModelWithAdditiveBias, constant_noise, gaussian_noise, uniform_noise
# Backward compatibility
ConstantBiasNoiseCfg = ConstantNoiseCfg
AdditiveUniformNoiseCfg = UniformNoiseCfg
AdditiveGaussianNoiseCfg = GaussianNoiseCfg
......@@ -39,10 +39,6 @@ class ConstantNoiseCfg(NoiseCfg):
"""The bias to add. Defaults to 0.0."""
# Backward compatibility
ConstantBiasNoiseCfg = ConstantNoiseCfg
@configclass
class UniformNoiseCfg(NoiseCfg):
"""Configuration for a additive uniform noise term."""
......@@ -55,10 +51,6 @@ class UniformNoiseCfg(NoiseCfg):
"""The maximum value of the noise. Defaults to 1.0."""
# Backward compatibility
AdditiveUniformNoiseCfg = UniformNoiseCfg
@configclass
class GaussianNoiseCfg(NoiseCfg):
"""Configuration for an additive gaussian noise term."""
......@@ -71,8 +63,9 @@ class GaussianNoiseCfg(NoiseCfg):
"""The standard deviation of the noise. Defaults to 1.0."""
# Backward compatibility
AdditiveGaussianNoiseCfg = GaussianNoiseCfg
##
# Noise models
##
@configclass
......@@ -93,3 +86,7 @@ class NoiseModelWithAdditiveBiasCfg(NoiseModelCfg):
class_type: type = noise_model.NoiseModelWithAdditiveBias
bias_noise_cfg: NoiseCfg = MISSING
"""The noise configuration for the bias.
Based on this configuration, the bias is sampled at every reset of the noise model.
"""
......@@ -12,6 +12,10 @@ from typing import TYPE_CHECKING
if TYPE_CHECKING:
from . import noise_cfg
##
# Noise as functions.
##
def constant_noise(data: torch.Tensor, cfg: noise_cfg.ConstantNoiseCfg) -> torch.Tensor:
"""Constant noise."""
......@@ -49,66 +53,85 @@ def gaussian_noise(data: torch.Tensor, cfg: noise_cfg.GaussianNoiseCfg) -> torch
raise ValueError(f"Unknown operation in noise: {cfg.operation}")
##
# Noise models as classes
##
class NoiseModel:
"""Base class for noise models."""
def __init__(self, num_envs: int, noise_model_cfg: noise_cfg.NoiseModelCfg):
def __init__(self, noise_model_cfg: noise_cfg.NoiseModelCfg, num_envs: int, device: str):
"""Initialize the noise model.
Args:
num_envs: The number of environments.
noise_model_cfg: The noise configuration to use.
num_envs: The number of environments.
device: The device to use for the noise model.
"""
self._num_envs = num_envs
self._noise_model_cfg = noise_model_cfg
self._num_envs = num_envs
self._device = device
def apply(self, data: torch.Tensor) -> torch.Tensor:
r"""Apply the noise to the data.
Args:
data: The data to apply the noise to, which is a tensor of shape (num_envs, \*data_shape).
"""
return self._noise_model_cfg.noise_cfg.func(data, self._noise_model_cfg.noise_cfg)
def reset(self, env_ids: Sequence[int]):
def reset(self, env_ids: Sequence[int] | None = None):
"""Reset the noise model.
This method can be implemented by derived classes to reset the noise model.
This is useful when implementing temporal noise models such as random walk.
Args:
env_ids: The environment ids to reset the noise model for.
env_ids: The environment ids to reset the noise model for. Defaults to None,
in which case all environments are considered.
"""
pass
def apply(self, data: torch.Tensor) -> torch.Tensor:
"""Apply the noise to the data.
Args:
data: The data to apply the noise to. Shape is (num_envs, ...).
Returns:
The data with the noise applied. Shape is the same as the input data.
"""
return self._noise_model_cfg.noise_cfg.func(data, self._noise_model_cfg.noise_cfg)
class NoiseModelWithAdditiveBias(NoiseModel):
"""Noise model with an additive bias.
The bias term is sampled from a the specified distribution on reset.
"""
def __init__(self, num_envs: int, noise_model_cfg: noise_cfg.NoiseModelWithAdditiveBiasCfg, device: str):
super().__init__(num_envs, noise_model_cfg)
self._device = device
def __init__(self, noise_model_cfg: noise_cfg.NoiseModelWithAdditiveBiasCfg, num_envs: int, device: str):
# initialize parent class
super().__init__(noise_model_cfg, num_envs, device)
# store the bias noise configuration
self._bias_noise_cfg = noise_model_cfg.bias_noise_cfg
self._bias = torch.zeros((num_envs, 1), device=self._device)
def apply(self, data: torch.Tensor) -> torch.Tensor:
r"""Apply the noise + bias.
Args:
data: The data to apply the noise to, which is a tensor of shape (num_envs, \*data_shape).
"""
return super().apply(data) + self._bias
def reset(self, env_ids: Sequence[int]):
def reset(self, env_ids: Sequence[int] | None = None):
"""Reset the noise model.
This method resets the bias term for the specified environments.
Args:
env_ids: The environment ids to reset the noise model for.
env_ids: The environment ids to reset the noise model for. Defaults to None,
in which case all environments are considered.
"""
# resolve the environment ids
if env_ids is None:
env_ids = slice(None)
# reset the bias term
self._bias[env_ids] = self._bias_noise_cfg.func(self._bias[env_ids], self._bias_noise_cfg)
def apply(self, data: torch.Tensor) -> torch.Tensor:
"""Apply bias noise to the data.
Args:
data: The data to apply the noise to. Shape is (num_envs, ...).
Returns:
The data with the noise applied. Shape is the same as the input data.
"""
return super().apply(data) + self._bias
......@@ -20,7 +20,7 @@ import unittest
from collections import namedtuple
from omni.isaac.lab.managers import ManagerTermBase, ObservationGroupCfg, ObservationManager, ObservationTermCfg
from omni.isaac.lab.utils import configclass
from omni.isaac.lab.utils import configclass, modifiers
def grilled_chicken(env):
......@@ -95,11 +95,12 @@ class TestObservationManager(unittest.TestCase):
def setUp(self) -> None:
# set up the environment
self.dt = 0.01
self.num_envs = 20
self.device = "cuda:0"
# create dummy environment
self.env = namedtuple("ManagerBasedEnv", ["num_envs", "device", "data"])(
self.num_envs, self.device, MyDataClass(self.num_envs, self.device)
self.env = namedtuple("ManagerBasedEnv", ["num_envs", "device", "data", "dt"])(
self.num_envs, self.device, MyDataClass(self.num_envs, self.device), self.dt
)
def test_str(self):
......@@ -377,6 +378,81 @@ class TestObservationManager(unittest.TestCase):
with self.assertRaises(NotImplementedError):
self.obs_man = ObservationManager(cfg, self.env)
def test_modifier_compute(self):
"""Test the observation computation with modifiers."""
modifier_1 = modifiers.ModifierCfg(func=modifiers.bias, params={"value": 1.0})
modifier_2 = modifiers.ModifierCfg(func=modifiers.scale, params={"multiplier": 2.0})
modifier_3 = modifiers.ModifierCfg(func=modifiers.clip, params={"bounds": (-0.5, 0.5)})
modifier_4 = modifiers.IntegratorCfg(dt=self.env.dt)
@configclass
class MyObservationManagerCfg:
"""Test config class for observation manager."""
@configclass
class PolicyCfg(ObservationGroupCfg):
"""Test config class for policy observation group."""
concatenate_terms = False
term_1 = ObservationTermCfg(func=pos_w_data, modifiers=[])
term_2 = ObservationTermCfg(func=pos_w_data, modifiers=[modifier_1])
term_3 = ObservationTermCfg(func=pos_w_data, modifiers=[modifier_1, modifier_4])
@configclass
class CriticCfg(ObservationGroupCfg):
"""Test config class for critic observation group"""
concatenate_terms = False
term_1 = ObservationTermCfg(func=pos_w_data, modifiers=[])
term_2 = ObservationTermCfg(func=pos_w_data, modifiers=[modifier_1])
term_3 = ObservationTermCfg(func=pos_w_data, modifiers=[modifier_1, modifier_2])
term_4 = ObservationTermCfg(func=pos_w_data, modifiers=[modifier_1, modifier_2, modifier_3])
policy: ObservationGroupCfg = PolicyCfg()
critic: ObservationGroupCfg = CriticCfg()
# create observation manager
cfg = MyObservationManagerCfg()
self.obs_man = ObservationManager(cfg, self.env)
# compute observation using manager
observations = self.obs_man.compute()
# obtain the group observations
obs_policy: dict[str, torch.Tensor] = observations["policy"]
obs_critic: dict[str, torch.Tensor] = observations["critic"]
# check correct application of modifications
torch.testing.assert_close(obs_policy["term_1"] + 1.0, obs_policy["term_2"])
torch.testing.assert_close(obs_critic["term_1"] + 1.0, obs_critic["term_2"])
torch.testing.assert_close(2.0 * (obs_critic["term_1"] + 1.0), obs_critic["term_3"])
self.assertTrue(torch.min(obs_critic["term_4"]) >= -0.5)
self.assertTrue(torch.max(obs_critic["term_4"]) <= 0.5)
def test_modifier_invalid_config(self):
"""Test modifier initialization with invalid config."""
modifier = modifiers.ModifierCfg(func=modifiers.clip, params={"min": -0.5, "max": 0.5})
@configclass
class MyObservationManagerCfg:
"""Test config class for observation manager."""
@configclass
class PolicyCfg(ObservationGroupCfg):
"""Test config class for policy observation group."""
concatenate_terms = False
term_1 = ObservationTermCfg(func=pos_w_data, modifiers=[modifier])
policy: ObservationGroupCfg = PolicyCfg()
# create observation manager
cfg = MyObservationManagerCfg()
with self.assertRaises(ValueError):
self.obs_man = ObservationManager(cfg, self.env)
if __name__ == "__main__":
run_tests()
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Launch Isaac Sim Simulator first."""
from omni.isaac.lab.app import AppLauncher, run_tests
# launch omniverse app
app_launcher = AppLauncher(headless=True)
simulation_app = app_launcher.app
"""Rest everything follows."""
import torch
import unittest
from dataclasses import MISSING
import omni.isaac.lab.utils.modifiers as modifiers
from omni.isaac.lab.utils import configclass
@configclass
class ModifierTestCfg:
"""Configuration for testing modifiers."""
cfg: modifiers.ModifierCfg = MISSING
init_data: torch.Tensor = MISSING
result: torch.Tensor = MISSING
num_iter: int = 10
class TestModifiers(unittest.TestCase):
"""Test different modifiers implementations."""
def test_scale_modifier(self):
"""Test for scale modifier."""
# create a random tensor
data = torch.rand(128, 128, device="cuda")
# create a modifier configuration
cfg = modifiers.ModifierCfg(func=modifiers.scale, params={"multiplier": 2.0})
# apply the modifier
processed_data = cfg.func(data, **cfg.params)
# check if the shape of the modified data is the same as the original data
self.assertEqual(data.shape, processed_data.shape, msg="Modified data shape does not equal original")
torch.testing.assert_close(processed_data, data * cfg.params["multiplier"])
def test_bias_modifier(self):
"""Test for bias modifier."""
# create a random tensor
data = torch.rand(128, 128, device="cuda")
# create a modifier configuration
cfg = modifiers.ModifierCfg(func=modifiers.bias, params={"value": 0.5})
# apply the modifier
processed_data = cfg.func(data, **cfg.params)
# check if the shape of the modified data is the same as the original data
self.assertEqual(data.shape, processed_data.shape, msg="Modified data shape does not equal original")
torch.testing.assert_close(processed_data - data, torch.ones_like(data) * cfg.params["value"])
def test_clip_modifier(self):
"""Test for clip modifier."""
# create a random tensor
data = torch.rand(128, 128, device="cuda")
# create a modifier configuration
cfg = modifiers.ModifierCfg(func=modifiers.clip, params={"bounds": (0.5, 2.5)})
# apply the modifier
processed_data = cfg.func(data, **cfg.params)
# check if the shape of the modified data is the same as the original data
self.assertEqual(data.shape, processed_data.shape, msg="Modified data shape does not equal original")
self.assertTrue(torch.min(processed_data) >= cfg.params["bounds"][0])
self.assertTrue(torch.max(processed_data) <= cfg.params["bounds"][1])
def test_clip_no_upper_bound_modifier(self):
"""Test for clip modifier with no upper bound."""
# create a random tensor
data = torch.rand(128, 128, device="cuda")
# create a modifier configuration
cfg = modifiers.ModifierCfg(func=modifiers.clip, params={"bounds": (0.0, None)})
# apply the modifier
processed_data = cfg.func(data, **cfg.params)
# check if the shape of the modified data is the same as the original data
self.assertEqual(data.shape, processed_data.shape, msg="Modified data shape does not equal original")
self.assertTrue(torch.min(processed_data) >= cfg.params["bounds"][0])
def test_clip_no_lower_bound_modifier(self):
"""Test for clip modifier with no lower bound."""
# create a random tensor
data = torch.rand(128, 128, device="cuda")
# create a modifier configuration
cfg = modifiers.ModifierCfg(func=modifiers.clip, params={"bounds": (None, 0.0)})
# apply the modifier
processed_data = cfg.func(data, **cfg.params)
# check if the shape of the modified data is the same as the original data
self.assertEqual(data.shape, processed_data.shape, msg="Modified data shape does not equal original")
self.assertTrue(torch.min(processed_data) <= cfg.params["bounds"][1])
def test_torch_relu_modifier(self):
"""Test for torch relu modifier."""
# create a random tensor
data = torch.rand(128, 128, device="cuda")
# create a modifier configuration
cfg = modifiers.ModifierCfg(func=torch.nn.functional.relu)
# apply the modifier
processed_data = cfg.func(data)
# check if the shape of the modified data is the same as the original data
self.assertEqual(data.shape, processed_data.shape, msg="modified data shape does not equal original")
self.assertTrue(torch.all(processed_data >= 0.0))
def test_digital_filter(self):
"""Test for digital filter modifier."""
for device in ["cpu", "cuda"]:
with self.subTest(device=device):
# create a modifier configuration
modifier_cfg = modifiers.DigitalFilterCfg(A=[0.0, 0.1], B=[0.5, 0.5])
# create a test configuration
test_cfg = ModifierTestCfg(
cfg=modifier_cfg,
init_data=torch.tensor([0.0, 0.0, 0.0], device=device).unsqueeze(1),
result=torch.tensor([-0.45661893, -0.45661893, -0.45661893], device=device).unsqueeze(1),
num_iter=16,
)
# create a modifier instance
modifier_obj = modifier_cfg.func(modifier_cfg, test_cfg.init_data.shape, device=device)
# test the modifier
theta = torch.tensor([0.0], device=device)
delta = torch.pi / torch.tensor([8.0, 8.0, 8.0], device=device).unsqueeze(1)
for _ in range(5):
# reset the modifier
modifier_obj.reset()
# apply the modifier multiple times
for i in range(test_cfg.num_iter):
data = torch.sin(theta + i * delta)
processed_data = modifier_obj(data)
self.assertEqual(
data.shape, processed_data.shape, msg="Modified data shape does not equal original"
)
# check if the modified data is close to the expected result
torch.testing.assert_close(processed_data, test_cfg.result)
def test_integral(self):
"""Test for integral modifier."""
for device in ["cpu", "cuda"]:
with self.subTest(device=device):
# create a modifier configuration
modifier_cfg = modifiers.IntegratorCfg(dt=1.0)
# create a test configuration
test_cfg = ModifierTestCfg(
cfg=modifier_cfg,
init_data=torch.tensor([0.0], device=device).unsqueeze(1),
result=torch.tensor([12.5], device=device).unsqueeze(1),
num_iter=6,
)
# create a modifier instance
modifier_obj = modifier_cfg.func(modifier_cfg, test_cfg.init_data.shape, device=device)
# test the modifier
delta = torch.tensor(1.0, device=device)
for _ in range(5):
# reset the modifier
modifier_obj.reset()
# clone the data to avoid modifying the original
data = test_cfg.init_data.clone()
# apply the modifier multiple times
for _ in range(test_cfg.num_iter):
processed_data = modifier_obj(data)
data = data + delta
self.assertEqual(
data.shape, processed_data.shape, msg="Modified data shape does not equal original"
)
# check if the modified data is close to the expected result
torch.testing.assert_close(processed_data, test_cfg.result)
if __name__ == "__main__":
run_tests()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment