Unverified Commit b25d6673 authored by David Hoeller's avatar David Hoeller Committed by GitHub

Modifies `ObservationManager` and `RewardManager` to take functions in each...

Modifies `ObservationManager` and `RewardManager` to take functions in each term's configuration (#61)

# Description

This PR modifies the `ObservationManager` and `RewardManager` to change
their default behavior. Earlier they expected that users inherit from
the class and define member functions inside it to compute the terms.
The function name was resolved from the term name itself. However, this
prevented the reuse of common terms and the rewriting of a lot of code
across environments.

The changes made in this PR add another argument `func` to the term
dictionary which now expects a function directly. This way users can use
the same function multiple times for different terms. Example:

```python
def get_contact_force(env, bodies):
   ....
   
class ObservationManagerCfg:

   term_1 = {"func": get_contact_force, "bodies": "base"}
   term_2 = {"func": get_contact_force, "bodies": ".*_FOOT"}
```

The older class implementations have been moved to
`omni.isaac.orbit.compat.utils.mdp` for compatibility.

Additionally, we include `ManagerBase` and `ManagerBaseTermCfg` as a
base class to ensure a better signature on how configuration terms are
handled.

## Type of change

- Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- This change requires a documentation update

## Checklist

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./orbit.sh --format`
- [x] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file

---------
Co-authored-by: 's avatarMayank Mittal <mittalma@leggedrobotics.com>
parent 4e727991
...@@ -8,6 +8,7 @@ omni.isaac.orbit.compat ...@@ -8,6 +8,7 @@ omni.isaac.orbit.compat
* :mod:`omni.isaac.orbit.markers` * :mod:`omni.isaac.orbit.markers`
* :mod:`omni.isaac.orbit.sensors` * :mod:`omni.isaac.orbit.sensors`
* :mod:`omni.isaac.orbit.utils.mdp`
Markers Markers
------- -------
...@@ -93,3 +94,12 @@ Utilities ...@@ -93,3 +94,12 @@ Utilities
:members: :members:
:undoc-members: :undoc-members:
:show-inheritance: :show-inheritance:
MDP Managers
------------
.. automodule:: omni.isaac.orbit.compat.utils.mdp
:members:
:undoc-members:
:show-inheritance:
[package] [package]
# Note: Semantic Versioning is used: https://semver.org/ # Note: Semantic Versioning is used: https://semver.org/
version = "0.6.2" version = "0.7.0"
# Description # Description
title = "ORBIT framework for Robot Learning" title = "ORBIT framework for Robot Learning"
......
Changelog Changelog
--------- ---------
0.7.0 (2023-07-10)
~~~~~~~~~~~~~~~~~~
Added
^^^^^
* Created a new :mod:`omni.isaac.orbit.managers` module for all the managers related to the environment / scene.
This includes the :class:`omni.isaac.orbit.managers.ObservationManager` and :class:`omni.isaac.orbit.managers.RewardManager`
classes that were previously in the :mod:`omni.isaac.orbit.utils.mdp` module.
* Added the :class:`omni.isaac.orbit.managers.ManagerBase` class to handle the creation of managers.
* Added configuration classes for :class:`ObservationTermCfg` and :class:`RewardTermCfg` to allow easy creation of
observation and reward terms.
Changed
^^^^^^^
* Changed the behavior of :class:`ObservationManager` and :class:`RewardManager` classes to accept the key ``func``
in each configuration term to be a callable. This removes the need to inherit from the base class
and allows more reusability of the functions across different environments.
* Moved the old managers to the :mod:`omni.isaac.orbit.compat.utils.mdp` module.
* Modified the necessary scripts to use the :mod:`omni.isaac.orbit.compat.utils.mdp` module.
0.6.2 (2023-07-21) 0.6.2 (2023-07-21)
~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~
...@@ -21,7 +43,6 @@ Fixed ...@@ -21,7 +43,6 @@ Fixed
* Fixed the :meth:`omni.isaac.orbit.utils.math.quat_apply_yaw` to compute the yaw quaternion correctly. * Fixed the :meth:`omni.isaac.orbit.utils.math.quat_apply_yaw` to compute the yaw quaternion correctly.
Added Added
^^^^^^^ ^^^^^^^
...@@ -55,6 +76,7 @@ Changed ...@@ -55,6 +76,7 @@ Changed
0.5.0 (2023-07-04) 0.5.0 (2023-07-04)
~~~~~~~~~~~~~~~~~~
Added Added
^^^^^ ^^^^^
......
...@@ -10,7 +10,7 @@ from copy import deepcopy ...@@ -10,7 +10,7 @@ from copy import deepcopy
from dataclasses import Field, dataclass, field from dataclasses import Field, dataclass, field
from typing import Any, Callable, ClassVar, Dict from typing import Any, Callable, ClassVar, Dict
from .dict import class_to_dict, update_class_from_dict from omni.isaac.orbit.utils.dict import class_to_dict, update_class_from_dict
# List of all methods provided by sub-module. # List of all methods provided by sub-module.
__all__ = ["configclass"] __all__ = ["configclass"]
......
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
This sub-module introduces the managers for handling various aspects of the environment.
The managers are used to handle various aspects of the environment such as randomization, curriculum, and
observations. Each manager implements a specific functionality for the environment. The managers are
designed to be modular and can be easily extended to support new functionality.
Each manager is implemented as a class that inherits from the :class:`ManagerBase` class. Each manager
class should also have a corresponding configuration class that defines the configuration terms for the
manager. Each term should the :class:`ManagerBaseTermCfg` class or its subclass.
Example pseudo-code for a manager:
.. code-block:: python
from omni.isaac.orbit.utils import configclass
from omni.isaac.orbit.utils.mdp import ManagerBase, ManagerBaseTermCfg
@configclass
class MyManagerCfg:
my_term_1: ManagerBaseTermCfg = ManagerBaseTermCfg(...)
my_term_2: ManagerBaseTermCfg = ManagerBaseTermCfg(...)
my_term_3: ManagerBaseTermCfg = ManagerBaseTermCfg(...)
# define manager instance
my_manager = ManagerBase(cfg=ManagerCfg(), env=env)
"""
from .manager_cfg import ObservationGroupCfg, ObservationTermCfg, RewardTermCfg
from .observation_manager import ObservationManager
from .reward_manager import RewardManager
__all__ = [
"ObservationGroupCfg",
"ObservationTermCfg",
"ObservationManager",
"RewardTermCfg",
"RewardManager",
]
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import copy
import inspect
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Sequence, Union
from omni.isaac.orbit.utils import string_to_callable
from .manager_cfg import ManagerBaseTermCfg
class ManagerBase(ABC):
"""Base class for all managers."""
def __init__(self, cfg: object, env: object):
"""Initialize the manager.
Args:
cfg (object): The configuration object.
env (object): The environment instance.
"""
# store the inputs
self.cfg = copy.deepcopy(cfg)
self._env = env
# parse config to create terms information
self._prepare_terms()
"""
Properties.
"""
@property
def num_envs(self) -> int:
"""Number of environments."""
return self._env.num_envs
@property
def device(self) -> str:
"""Device on which to perform computations."""
return self._env.device
@property
@abstractmethod
def active_terms(self) -> Union[List[str], Dict[str, List[str]]]:
"""Name of active terms."""
raise NotImplementedError
"""
Operations.
"""
def log_info(self, env_ids: Optional[Sequence[int]] = None) -> Dict[str, float]:
"""Returns logging information for the current time-step.
Args:
env_ids (Sequence[int], optional): The environment ids for which to log data. Defaults
:obj:`None`, which logs data for all environments.
Returns:
Dict[str, float]: Dictionary containing the logging information.
"""
return {}
"""
Implementation specific.
"""
@abstractmethod
def _prepare_terms(self):
"""Prepare terms information from the configuration object."""
raise NotImplementedError
"""
Helper functions.
"""
def _resolve_common_term_cfg(self, term_name: str, term_cfg: ManagerBaseTermCfg, min_argc: int = 1):
"""Resolve common term configuration.
Usually, called by the :meth:`_prepare_terms` method to resolve common term configuration.
Note:
By default, all term functions are expected to have at least one argument, which is the
environment object. Some other managers may expect functions to take more arguments, for
instance, the environment indices as the second argument. In such cases, the
``min_argc`` argument can be used to specify the minimum number of arguments
required by the term function to be called correctly by the manager.
Args:
term_name (str): The name of the term.
term_cfg (ManagerBaseTermCfg): The term configuration.
min_argc (int): The minimum number of arguments required by the term function to
be called correctly by the manager.
Raises:
TypeError: If the term configuration is not of type :class:`ManagerBaseTermCfg`.
AttributeError: If the term function is not callable.
ValueError: If the term function's arguments are not matched by the parameters.
"""
# check if the term is a valid term config
if not isinstance(term_cfg, ManagerBaseTermCfg):
raise TypeError(
f"Configuration for the term '{term_name}' is not of type ManagerBaseTermCfg. Received '{type(term_cfg)}'."
)
# check if a sensor should be enabled
if term_cfg.sensor_name is not None:
# TODO: This is a hack. We should not be enabling sensors here.
# Instead, we should be enabling sensors in the sensor manager or somewhere else.
self._env.enable_sensor(term_cfg.sensor_name)
term_cfg.params["sensor_name"] = term_cfg.sensor_name
# convert joint names to indices based on regex
# TODO: What is user wants to penalize joints on one asset and bodies on another?
if term_cfg.dof_names is not None:
# check that the asset name is provided
if term_cfg.asset_name is None:
raise ValueError(f"The term '{term_name}' requires the asset name to be provided.")
# acquire the dof indices
dof_ids, _ = getattr(self._env, term_cfg.asset_name).find_dofs(term_cfg.dof_names)
term_cfg.params["dof_ids"] = dof_ids
# convert body names to indices based on regex
if term_cfg.body_names is not None:
# check that the asset name is provided
if term_cfg.asset_name is None:
raise ValueError(f"The term '{term_name}' requires the asset name to be provided.")
# acquire the body indices
body_ids, _ = getattr(self._env, term_cfg.asset_name).find_bodies(term_cfg.body_names)
term_cfg.params["body_ids"] = body_ids
# get the corresponding function
if isinstance(term_cfg.func, str):
term_cfg.func = string_to_callable(term_cfg.func)
# check if function is callable
if not callable(term_cfg.func):
raise AttributeError(f"The term '{term_name}' is not callable. Received: {term_cfg.func}")
# check if curriculum term's arguments are matched by params
term_params = list(term_cfg.params.keys())
args = inspect.getfullargspec(term_cfg.func).args
# ignore first two arguments for env and env_ids
# Think: Check for cases when kwargs are set inside the function?
if len(args) > min_argc:
if set(args[min_argc:]) != set(term_params):
msg = f"The term '{term_name}' expects parameters: {args[min_argc:]}, but {term_params} provided."
raise ValueError(msg)
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Configuration terms for different managers."""
from __future__ import annotations
import torch
from dataclasses import MISSING
from typing import Any, Callable, Sequence
from omni.isaac.orbit.utils import configclass
from omni.isaac.orbit.utils.noise import NoiseCfg
@configclass
class ManagerBaseTermCfg:
"""Configuration for a curriculum term."""
func: Callable = MISSING
"""The function to be called for the term.
The function must take the environment object as the first argument.
"""
sensor_name: str | None = None
"""The name of the sensor required by the term. Defaults to None.
If the sensor is not already enabled, it will be enabled in the environment on initialization
of the manager and passed to the term function as a string under :attr:`sensor_name`.
"""
asset_name: str | None = None
"""The name of the asset used to resolve the joints and bodies required by the term. Defaults to None."""
dof_names: Sequence[str] | None = None
"""The names of the joints from the asset required by the term. Defaults to None.
The names can be either joint names or a regular expression matching the joint names.
These are converted to joint indices on initialization of the manager and passed to the term
function as a list of joint indices under :attr:`dof_ids`.
"""
body_names: Sequence[str] | None = None
"""The names of the bodies from the asset required by the term. Defaults to None.
The names can be either body names or a regular expression matching the body names.
These are converted to body indices on initialization of the manager and passed to the term
function as a list of body indices under :attr:`body_ids`.
"""
params: dict[str, Any] = dict()
"""The parameters to be passed to the function as keyword arguments. Defaults to an empty dict."""
"""Observation manager."""
@configclass
class ObservationTermCfg(ManagerBaseTermCfg):
"""Configuration for an observation term."""
func: Callable[..., torch.Tensor] = MISSING
"""The name of the function to be called.
This function should take the environment object and any other parameters
as input and return the observation signal as torch float tensors of
shape ``(num_envs, obs_term_dim)``.
"""
noise: NoiseCfg | None = None
"""The noise to add to the observation. Defaults to None, in which case no noise is added."""
clip: tuple[float, float] | None = None
"""The clipping range for the observation after adding noise. Defaults to None,
in which case no clipping is applied."""
scale: float | None = None
"""The scale to apply to the observation after clipping. Defaults to None,
in which case no scaling is applied (same as setting scale to :obj:`1`)."""
@configclass
class ObservationGroupCfg:
"""Configuration for an observation group."""
concatenate_terms: bool = True
"""Whether to concatenate the observation terms in the group. Defaults to True.
If true, the observation terms in the group are concatenated along the last dimension.
Otherwise, they are kept separate and returned as a dictionary.
"""
enable_corruption: bool = False
"""Whether to enable corruption for the observation group. Defaults to False.
If true, the observation terms in the group are corrupted by adding noise (if specified).
Otherwise, no corruption is applied.
"""
"""Reward manager."""
@configclass
class RewardTermCfg(ManagerBaseTermCfg):
"""Configuration for a reward term."""
func: Callable[..., torch.Tensor] = MISSING
"""The name of the function to be called.
This function should take the environment object and any other parameters
as input and return the reward signals as torch float tensors of
shape ``(num_envs,)``.
"""
weight: float = MISSING
"""The weight of the reward term.
This is multiplied with the reward term's value to compute the final
reward.
Note:
If the weight is zero, the reward term is ignored.
"""
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Observation manager for computing observation signals for a given world."""
import torch
from prettytable import PrettyTable
from typing import Dict, List, Tuple
from .manager_base import ManagerBase
from .manager_cfg import ObservationGroupCfg, ObservationTermCfg
class ObservationManager(ManagerBase):
"""Manager for computing observation signals for a given world.
Observations are organized into groups based on their intended usage. This allows having different observation
groups for different types of learning such as asymmetric actor-critic and student-teacher training. Each
group contains observation terms which contain information about the observation function to call, the noise
corruption model to use, and the sensor to retrieve data from.
Each observation group should inherit from the :class:`ObservationGroupCfg` class. Within each group, each
observation term should instantiate the :class:`ObservationTermCfg` class.
"""
def __init__(self, cfg: object, env: object):
"""Initialize observation manager.
Args:
cfg (object): The configuration object or dictionary (``dict[str, ObservationGroupCfg]``).
env (object): The environment instance.
"""
super().__init__(cfg, env)
# compute combined vector for obs group
self._group_obs_dim: Dict[str, Tuple[int, ...]] = dict()
for group_name, group_term_dims in self._group_obs_term_dim.items():
term_dims = [torch.tensor(dims, device="cpu") for dims in group_term_dims]
self._group_obs_dim[group_name] = tuple(torch.sum(torch.stack(term_dims, dim=0), dim=0).tolist())
def __str__(self) -> str:
"""Returns: A string representation for the observation manager."""
msg = f"<ObservationManager> contains {len(self._group_obs_term_names)} groups.\n"
# add info for each group
for group_name, group_dim in self._group_obs_dim.items():
# create table for term information
table = PrettyTable()
table.title = f"Active Observation Terms in Group: '{group_name}' (shape: {group_dim})"
table.field_names = ["Index", "Name", "Shape"]
# set alignment of table columns
table.align["Name"] = "l"
# add info for each term
obs_terms = zip(
self._group_obs_term_names[group_name],
self._group_obs_term_dim[group_name],
)
for index, (name, dims) in enumerate(obs_terms):
# resolve inputs to simplify prints
tab_dims = tuple(dims)
# add row
table.add_row([index, name, tab_dims])
# convert table to string
msg += table.get_string()
msg += "\n"
return msg
"""
Properties.
"""
@property
def active_terms(self) -> Dict[str, List[str]]:
"""Name of active observation terms in each group."""
return self._group_obs_term_names
@property
def group_obs_dim(self) -> Dict[str, Tuple[int, ...]]:
"""Shape of observation tensor in each group."""
return self._group_obs_dim
@property
def group_obs_term_dim(self) -> Dict[str, List[Tuple[int, ...]]]:
"""Shape of observation tensor for each term in each group."""
return self._group_obs_term_dim
"""
Operations.
"""
def compute(self) -> Dict[str, torch.Tensor]:
"""Compute the observations per group.
The method computes the observations for each group and returns a dictionary with keys as
the group names and values as the computed observations. The observations are computed
by calling the registered functions for each term in the group. The functions are called
in the order of the terms in the group. The functions are expected to return a tensor
with shape ``(num_envs, ...)``. The tensors are then concatenated along the last dimension to
form the observations for the group.
If a corruption/noise model is registered for a term, the function is called to corrupt
the observation. The corruption function is expected to return a tensor with the same
shape as the observation. The observations are clipped and scaled as per the configuration
settings. By default, no scaling or clipping is applied.
Returns:
Dict[str, torch.Tensor]: A dictionary with keys as the group names and values as the
computed observations.
"""
self._obs_buffer = dict()
# iterate over all the terms in each group
for group_name, group_term_names in self._group_obs_term_names.items():
# buffer to store obs per group
group_obs = dict.fromkeys(group_term_names, None)
# read attributes for each term
obs_terms = zip(group_term_names, self._group_obs_term_cfgs[group_name])
# evaluate terms: compute, add noise, clip, scale.
for name, term_cfg in obs_terms:
# compute term's value
obs: torch.Tensor = term_cfg.func(self._env, **term_cfg.params)
# apply post-processing
if term_cfg.noise:
obs = term_cfg.noise.func(obs, term_cfg.noise)
if term_cfg.clip:
obs = obs.clip_(min=term_cfg.clip[0], max=term_cfg.clip[1])
if term_cfg.scale:
obs = obs.mul_(term_cfg.scale)
# TODO: Introduce delay and filtering models.
# Ref: https://robosuite.ai/docs/modules/sensors.html#observables
# add value to list
group_obs[name] = obs
# concatenate all observations in the group together
if self._group_obs_concatenate[group_name]:
self._obs_buffer[group_name] = torch.cat(list(group_obs.values()), dim=-1)
else:
self._obs_buffer[group_name] = group_obs
# return all group observations
return self._obs_buffer
"""
Helper functions.
"""
def _prepare_terms(self):
"""Prepares a list of observation terms functions."""
# create buffers to store information for each observation group
# TODO: Make this more convenient by using data structures.
self._group_obs_term_names: Dict[str, List[str]] = dict()
self._group_obs_term_dim: Dict[str, List[int]] = dict()
self._group_obs_term_cfgs: Dict[str, List[ObservationTermCfg]] = dict()
self._group_obs_concatenate: Dict[str, bool] = dict()
# check if config is dict already
if isinstance(self.cfg, dict):
group_cfg_items = self.cfg.items()
else:
group_cfg_items = self.cfg.__dict__.items()
# iterate over all the groups
for group_name, group_cfg in group_cfg_items:
# check for non config
if group_cfg is None:
continue
# check if the term is a curriculum term
if not isinstance(group_cfg, ObservationGroupCfg):
raise TypeError(
f"Observation group '{group_name}' is not of type 'ObservationGroupCfg'. Received '{type(group_cfg)}'."
)
# initialize list for the group settings
self._group_obs_term_names[group_name] = list()
self._group_obs_term_dim[group_name] = list()
self._group_obs_term_cfgs[group_name] = list()
# read common config for the group
self._group_obs_concatenate[group_name] = group_cfg.concatenate_terms
# check if config is dict already
if isinstance(group_cfg, dict):
group_cfg_items = group_cfg.items()
else:
group_cfg_items = group_cfg.__dict__.items()
# iterate over all the terms in each group
for term_name, term_cfg in group_cfg.__dict__.items():
# skip non-obs settings
if term_name in ["enable_corruption", "concatenate_terms"]:
continue
# check for non config
if term_cfg is None:
continue
# resolve common terms in the config
self._resolve_common_term_cfg(f"{group_name}/{term_name}", term_cfg, min_argc=1)
# check noise settings
if not group_cfg.enable_corruption:
term_cfg.noise = None
# add term config to list to list
self._group_obs_term_names[group_name].append(term_name)
self._group_obs_term_cfgs[group_name].append(term_cfg)
# call function the first time to fill up dimensions
obs_dims = tuple(term_cfg.func(self._env, **term_cfg.params).shape[1:])
self._group_obs_term_dim[group_name].append(obs_dims)
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Reward manager for computing reward signals for a given world."""
import torch
from prettytable import PrettyTable
from typing import Dict, List, Optional, Sequence
from .manager_base import ManagerBase
from .manager_cfg import RewardTermCfg
class RewardManager(ManagerBase):
"""Manager for computing reward signals for a given world.
The reward manager computes the total reward as a sum of the weighted reward terms. The reward
terms are parsed from a nested config class containing the reward manger's settings and reward
terms configuration.
The reward terms are parsed from a config class containing the manager's settings and each term's
parameters. Each reward term should instantiate the :class:`RewardTermCfg` class.
.. note::
The reward manager multiplies the reward term's ``weight`` with the time-step interval ``dt``
of the environment. This is done to ensure that the computed reward terms are balanced with
respect to the chosen time-step interval in the environment.
"""
def __init__(self, cfg: object, env: object):
"""Initialize the reward manager.
Args:
cfg (object): The configuration object or dictionary (``dict[str, RewardTermCfg]``).
env (object): The environment instance.
"""
super().__init__(cfg, env)
# prepare extra info to store individual reward term information
self._episode_sums = dict()
for term_name in self._term_names:
self._episode_sums[term_name] = torch.zeros(self.num_envs, dtype=torch.float, device=self.device)
# create buffer for managing reward per environment
self._reward_buf = torch.zeros(self.num_envs, dtype=torch.float, device=self.device)
def __str__(self) -> str:
"""Returns: A string representation for reward manager."""
msg = f"<RewardManager> contains {len(self._term_names)} active terms.\n"
# create table for term information
table = PrettyTable()
table.title = "Active Reward Terms"
table.field_names = ["Index", "Name", "Weight"]
# set alignment of table columns
table.align["Name"] = "l"
table.align["Weight"] = "r"
# add info on each term
for index, (name, term_cfg) in enumerate(zip(self._term_names, self._term_cfgs)):
table.add_row([index, name, term_cfg.weight])
# convert table to string
msg += table.get_string()
return msg
"""
Properties.
"""
@property
def dt(self) -> float:
"""The environment time-step (in seconds)."""
return self._env.dt
@property
def active_terms(self) -> List[str]:
"""Name of active reward terms."""
return self._term_names
"""
Operations.
"""
def log_info(self, env_ids: Optional[Sequence[int]] = None) -> Dict[str, torch.Tensor]:
"""Returns the episodic sum of individual reward terms.
Args:
env_ids (Sequence[int], optional): The environment ids for which the episodic sum of
individual reward terms is to be returned. Defaults to all the environment ids.
Returns:
Dict[str, torch.Tensor]: Dictionary of episodic sum of individual reward terms.
"""
# resolve environment ids
if env_ids is None:
env_ids = ...
# store information
extras = {}
for key in self._episode_sums.keys():
extras["Episode Reward/" + key] = torch.mean(self._episode_sums[key][env_ids]) / (
self._env.max_episode_length * self.dt
) # FIXME
self._episode_sums[key][env_ids] = 0.0
return extras
def compute(self) -> torch.Tensor:
"""Computes the reward signal as a weighted sum of individual terms.
This function calls each reward term managed by the class and adds them to compute the net
reward signal. It also updates the episodic sums corresponding to individual reward terms.
Returns:
torch.Tensor: The net reward signal of shape (num_envs,).
"""
# reset computation
self._reward_buf[:] = 0.0
# iterate over all the reward terms
for name, term_cfg in zip(self._term_names, self._term_cfgs):
# compute term's value
value = term_cfg.func(self._env, **term_cfg.params) * term_cfg.weight
# update total reward
self._reward_buf += value
# update episodic sum
self._episode_sums[name] += value
return self._reward_buf
"""
Helper functions.
"""
def _prepare_terms(self):
"""Prepares a list of reward functions."""
# parse remaining reward terms and decimate their information
self._term_names: List[str] = list()
self._term_cfgs: List[RewardTermCfg] = list()
# check if config is dict already
if isinstance(self.cfg, dict):
cfg_items = self.cfg.items()
else:
cfg_items = self.cfg.__dict__.items()
# iterate over all the terms
for term_name, term_cfg in cfg_items:
# check for non config
if term_cfg is None:
continue
# resolve common parameters
self._resolve_common_term_cfg(term_name, term_cfg, min_argc=1)
# remove zero scales and multiply non-zero ones by dt
# note: we multiply weights by dt to make them agnostic to control decimation
if term_cfg.weight == 0:
continue
term_cfg.weight *= self.dt
# add function to list
self._term_names.append(term_name)
self._term_cfgs.append(term_cfg)
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from .noise_cfg import AdditiveGaussianNoiseCfg, AdditiveUniformNoiseCfg, ConstantBiasNoiseCfg, NoiseCfg
__all__ = ["NoiseCfg", "AdditiveGaussianNoiseCfg", "AdditiveUniformNoiseCfg", "ConstantBiasNoiseCfg"]
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import torch
from dataclasses import MISSING
from typing import Callable
from omni.isaac.orbit.utils import configclass
from . import noise_model
@configclass
class NoiseCfg:
"""Configuration for a noise term."""
func: Callable[[torch.Tensor, NoiseCfg], torch.Tensor] = MISSING
"""The function to be called for applying the noise.
Note:
The shape of the input and output tensors must be the same.
"""
@configclass
class AdditiveUniformNoiseCfg(NoiseCfg):
"""Configuration for a additive uniform noise term."""
func = noise_model.additive_uniform_noise
n_min: float = -1.0
"""The minimum value of the noise. Defaults to -1.0."""
n_max: float = 1.0
"""The maximum value of the noise. Defaults to 1.0."""
@configclass
class AdditiveGaussianNoiseCfg(NoiseCfg):
"""Configuration for a additive gaussian noise term."""
func = noise_model.additive_gaussian_noise
mean: float = 0.0
"""The mean of the noise. Defaults to 0.0."""
std: float = 1.0
"""The standard deviation of the noise. Defaults to 1.0."""
@configclass
class ConstantBiasNoiseCfg(NoiseCfg):
"""Configuration for a constant bias noise term."""
func = noise_model.constant_bias_noise
bias: float = 0.0
"""The bias to add. Defaults to 0.0."""
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import torch
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from . import noise_cfg
def constant_bias_noise(data: torch.Tensor, cfg: noise_cfg.ConstantBiasNoiseCfg) -> torch.Tensor:
"""Add a constant noise."""
return data + cfg.bias
def additive_uniform_noise(data: torch.Tensor, cfg: noise_cfg.UniformNoiseCfg) -> torch.Tensor:
"""Adds a noise sampled from a uniform distribution."""
return data + torch.rand_like(data) * (cfg.n_max - cfg.n_min) + cfg.n_min
def additive_gaussian_noise(data: torch.Tensor, cfg: noise_cfg.GaussianNoiseCfg) -> torch.Tensor:
"""Adds a noise sampled from a gaussian distribution."""
return data + cfg.mean + cfg.std * torch.randn_like(data)
...@@ -7,7 +7,7 @@ import torch ...@@ -7,7 +7,7 @@ import torch
import unittest import unittest
from collections import namedtuple from collections import namedtuple
from omni.isaac.orbit.utils.mdp.observation_manager import ObservationManager from omni.isaac.orbit.compat.utils.mdp.observation_manager import ObservationManager
class DefaultObservationManager(ObservationManager): class DefaultObservationManager(ObservationManager):
......
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import unittest
from collections import namedtuple
from omni.isaac.orbit.compat.utils.mdp.reward_manager import RewardManager
class DefaultRewardManager(RewardManager):
def grilled_chicken(self, env):
return 1
def grilled_chicken_with_bbq(self, env, bbq: bool):
return 0
def grilled_chicken_with_curry(self, env, hot: bool):
return 0
def grilled_chicken_with_yoghurt(self, env, hot: bool, bland: float):
return 0
class TestRewardManager(unittest.TestCase):
"""Test cases for various situations with reward manager."""
def setUp(self) -> None:
self.env = namedtuple("IsaacEnv", [])()
self.device = "cpu"
self.num_envs = 20
self.dt = 0.1
def test_str(self):
cfg = {
"grilled_chicken": {"weight": 10},
"grilled_chicken_with_bbq": {"weight": 5, "bbq": True},
"grilled_chicken_with_yoghurt": {"weight": 1.0, "hot": False, "bland": 2.0},
}
self.rew_man = DefaultRewardManager(cfg, self.env, self.num_envs, self.dt, self.device)
self.assertEqual(len(self.rew_man.active_terms), 3)
# print the expected string
print()
print(self.rew_man)
def test_config_terms(self):
cfg = {"grilled_chicken": {"weight": 10}, "grilled_chicken_with_curry": {"weight": 0.0, "hot": False}}
self.rew_man = DefaultRewardManager(cfg, self.env, self.num_envs, self.dt, self.device)
self.assertEqual(len(self.rew_man.active_terms), 1)
def test_compute(self):
cfg = {"grilled_chicken": {"weight": 10}, "grilled_chicken_with_curry": {"weight": 0.0, "hot": False}}
self.rew_man = DefaultRewardManager(cfg, self.env, self.num_envs, self.dt, self.device)
# compute expected reward
expected_reward = cfg["grilled_chicken"]["weight"] * self.dt
# compute reward using manager
rewards = self.rew_man.compute()
# check the reward for environment index 0
self.assertEqual(float(rewards[0]), expected_reward)
def test_active_terms(self):
cfg = {
"grilled_chicken": {"weight": 10},
"grilled_chicken_with_bbq": {"weight": 5, "bbq": True},
"grilled_chicken_with_curry": {"weight": 0.0, "hot": False},
}
self.rew_man = DefaultRewardManager(cfg, self.env, self.num_envs, self.dt, self.device)
self.assertEqual(len(self.rew_man.active_terms), 2)
def test_invalid_reward_name(self):
cfg = {
"grilled_chicken": {"weight": 10},
"grilled_chicken_with_bbq": {"weight": 5, "bbq": True},
"grilled_chicken_with_no_bbq": {"weight": 0.1, "hot": False},
}
with self.assertRaises(AttributeError):
self.rew_man = DefaultRewardManager(cfg, self.env, self.num_envs, self.dt, self.device)
def test_invalid_reward_weight_config(self):
cfg = {"grilled_chicken": {}}
with self.assertRaises(KeyError):
self.rew_man = DefaultRewardManager(cfg, self.env, self.num_envs, self.dt, self.device)
def test_invalid_reward_config(self):
cfg = {
"grilled_chicken_with_bbq": {"weight": 0.1, "hot": False},
"grilled_chicken_with_yoghurt": {"weight": 2.0, "hot": False},
}
with self.assertRaises(ValueError):
self.rew_man = DefaultRewardManager(cfg, self.env, self.num_envs, self.dt, self.device)
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import torch
import unittest
from collections import namedtuple
from omni.isaac.orbit.managers import ObservationGroupCfg, ObservationManager, ObservationTermCfg
from omni.isaac.orbit.utils import configclass
def grilled_chicken(env):
return torch.ones(env.num_envs, 4, device=env.device)
def grilled_chicken_with_bbq(env, bbq: bool):
return bbq * torch.ones(env.num_envs, 1, device=env.device)
def grilled_chicken_with_curry(env, hot: bool):
return hot * 2 * torch.ones(env.num_envs, 1, device=env.device)
def grilled_chicken_with_yoghurt(env, hot: bool, bland: float):
return hot * bland * torch.ones(env.num_envs, 5, device=env.device)
class TestObservationManager(unittest.TestCase):
"""Test cases for various situations with observation manager."""
def setUp(self) -> None:
self.env = namedtuple("IsaacEnv", ["num_envs", "device"])(20, "cpu")
def test_str(self):
"""Test the string representation of the observation manager."""
@configclass
class MyObservationManagerCfg:
"""Test config class for observation manager."""
@configclass
class SampleGroupCfg(ObservationGroupCfg):
"""Test config class for policy observation group."""
term_1 = ObservationTermCfg(func="__main__:grilled_chicken", scale=10)
term_2 = ObservationTermCfg(func=grilled_chicken, scale=2)
term_3 = ObservationTermCfg(func=grilled_chicken_with_bbq, scale=5, params={"bbq": True})
term_4 = ObservationTermCfg(
func=grilled_chicken_with_yoghurt, scale=1.0, params={"hot": False, "bland": 2.0}
)
policy: ObservationGroupCfg = SampleGroupCfg()
# create observation manager
cfg = MyObservationManagerCfg()
self.obs_man = ObservationManager(cfg, self.env)
self.assertEqual(len(self.obs_man.active_terms["policy"]), 4)
# print the expected string
print()
print(self.obs_man)
def test_config_equivalence(self):
"""Test the equivalence of observation manager created from different config types."""
# create from config class
@configclass
class MyObservationManagerCfg:
"""Test config class for observation manager."""
@configclass
class SampleGroupCfg(ObservationGroupCfg):
"""Test config class for policy observation group."""
your_term = ObservationTermCfg(func="__main__:grilled_chicken", scale=10)
his_term = ObservationTermCfg(func=grilled_chicken, scale=2)
my_term = ObservationTermCfg(func=grilled_chicken_with_bbq, scale=5, params={"bbq": True})
her_term = ObservationTermCfg(
func=grilled_chicken_with_yoghurt, scale=1.0, params={"hot": False, "bland": 2.0}
)
policy = SampleGroupCfg()
critic = SampleGroupCfg(concatenate_terms=False, her_term=None)
cfg = MyObservationManagerCfg()
obs_man_from_cfg = ObservationManager(cfg, self.env)
# create from config class
@configclass
class MyObservationManagerAnnotatedCfg:
"""Test config class for observation manager with annotations on terms."""
@configclass
class SampleGroupCfg(ObservationGroupCfg):
"""Test config class for policy observation group."""
your_term: ObservationTermCfg = ObservationTermCfg(func="__main__:grilled_chicken", scale=10)
his_term: ObservationTermCfg = ObservationTermCfg(func=grilled_chicken, scale=2)
my_term: ObservationTermCfg = ObservationTermCfg(
func=grilled_chicken_with_bbq, scale=5, params={"bbq": True}
)
her_term: ObservationTermCfg = ObservationTermCfg(
func=grilled_chicken_with_yoghurt, scale=1.0, params={"hot": False, "bland": 2.0}
)
policy: ObservationGroupCfg = SampleGroupCfg()
critic: ObservationGroupCfg = SampleGroupCfg(concatenate_terms=False, her_term=None)
cfg = MyObservationManagerAnnotatedCfg()
obs_man_from_annotated_cfg = ObservationManager(cfg, self.env)
# check equivalence
# parsed terms
self.assertEqual(obs_man_from_cfg.active_terms, obs_man_from_annotated_cfg.active_terms)
self.assertEqual(obs_man_from_cfg.group_obs_term_dim, obs_man_from_annotated_cfg.group_obs_term_dim)
self.assertEqual(obs_man_from_cfg.group_obs_dim, obs_man_from_annotated_cfg.group_obs_dim)
# parsed term configs
self.assertEqual(obs_man_from_cfg._group_obs_term_cfgs, obs_man_from_annotated_cfg._group_obs_term_cfgs)
self.assertEqual(obs_man_from_cfg._group_obs_concatenate, obs_man_from_annotated_cfg._group_obs_concatenate)
def test_config_terms(self):
"""Test the number of terms in the observation manager."""
@configclass
class MyObservationManagerCfg:
"""Test config class for observation manager."""
@configclass
class SampleGroupCfg(ObservationGroupCfg):
"""Test config class for policy observation group."""
term_1 = ObservationTermCfg(func=grilled_chicken, scale=10)
term_2 = ObservationTermCfg(func=grilled_chicken_with_curry, scale=0.0, params={"hot": False})
policy: ObservationGroupCfg = SampleGroupCfg()
critic: ObservationGroupCfg = SampleGroupCfg(term_2=None)
# create observation manager
cfg = MyObservationManagerCfg()
self.obs_man = ObservationManager(cfg, self.env)
self.assertEqual(len(self.obs_man.active_terms["policy"]), 2)
self.assertEqual(len(self.obs_man.active_terms["critic"]), 1)
def test_compute(self):
"""Test the observation computation."""
@configclass
class MyObservationManagerCfg:
"""Test config class for observation manager."""
@configclass
class PolicyCfg(ObservationGroupCfg):
"""Test config class for policy observation group."""
term_1 = ObservationTermCfg(func=grilled_chicken, scale=10)
term_2 = ObservationTermCfg(func=grilled_chicken_with_curry, scale=0.0, params={"hot": False})
policy: ObservationGroupCfg = PolicyCfg()
# create observation manager
cfg = MyObservationManagerCfg()
self.obs_man = ObservationManager(cfg, self.env)
# compute observation using manager
observations = self.obs_man.compute()
# check the observation shape
self.assertEqual((self.env.num_envs, 5), observations["policy"].shape)
def test_invalid_observation_config(self):
"""Test the invalid observation config."""
@configclass
class MyObservationManagerCfg:
"""Test config class for observation manager."""
@configclass
class PolicyCfg(ObservationGroupCfg):
"""Test config class for policy observation group."""
term_1 = ObservationTermCfg(func=grilled_chicken_with_bbq, scale=0.1, params={"hot": False})
term_2 = ObservationTermCfg(func=grilled_chicken_with_yoghurt, scale=2.0, params={"hot": False})
policy: ObservationGroupCfg = PolicyCfg()
# create observation manager
cfg = MyObservationManagerCfg()
# check the invalid config
with self.assertRaises(ValueError):
self.obs_man = ObservationManager(cfg, self.env)
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import unittest
from collections import namedtuple
from omni.isaac.orbit.managers import RewardManager, RewardTermCfg
from omni.isaac.orbit.utils import configclass
def grilled_chicken(env):
return 1
def grilled_chicken_with_bbq(env, bbq: bool):
return 0
def grilled_chicken_with_curry(env, hot: bool):
return 0
def grilled_chicken_with_yoghurt(env, hot: bool, bland: float):
return 0
class TestRewardManager(unittest.TestCase):
"""Test cases for various situations with reward manager."""
def setUp(self) -> None:
self.env = namedtuple("IsaacEnv", ["num_envs", "dt", "device"])(20, 0.1, "cpu")
def test_str(self):
"""Test the string representation of the reward manager."""
cfg = {
"term_1": RewardTermCfg(func=grilled_chicken, weight=10),
"term_2": RewardTermCfg(func=grilled_chicken_with_bbq, weight=5, params={"bbq": True}),
"term_3": RewardTermCfg(
func=grilled_chicken_with_yoghurt,
weight=1.0,
params={"hot": False, "bland": 2.0},
),
}
self.rew_man = RewardManager(cfg, self.env)
self.assertEqual(len(self.rew_man.active_terms), 3)
# print the expected string
print()
print(self.rew_man)
def test_config_equivalence(self):
"""Test the equivalence of reward manager created from different config types."""
# create from dictionary
cfg = {
"my_term": RewardTermCfg(func=grilled_chicken, weight=10),
"your_term": RewardTermCfg(func=grilled_chicken_with_bbq, weight=2.0, params={"bbq": True}),
"his_term": RewardTermCfg(
func=grilled_chicken_with_yoghurt,
weight=1.0,
params={"hot": False, "bland": 2.0},
),
}
rew_man_from_dict = RewardManager(cfg, self.env)
# create from config class
@configclass
class MyRewardManagerCfg:
"""Reward manager config with no type annotations."""
my_term = RewardTermCfg(func=grilled_chicken, weight=10.0)
your_term = RewardTermCfg(func=grilled_chicken_with_bbq, weight=2.0, params={"bbq": True})
his_term = RewardTermCfg(func=grilled_chicken_with_yoghurt, weight=1.0, params={"hot": False, "bland": 2.0})
cfg = MyRewardManagerCfg()
rew_man_from_cfg = RewardManager(cfg, self.env)
# create from config class
@configclass
class MyRewardManagerAnnotatedCfg:
"""Reward manager config with type annotations."""
my_term: RewardTermCfg = RewardTermCfg(func=grilled_chicken, weight=10.0)
your_term: RewardTermCfg = RewardTermCfg(func=grilled_chicken_with_bbq, weight=2.0, params={"bbq": True})
his_term: RewardTermCfg = RewardTermCfg(
func=grilled_chicken_with_yoghurt, weight=1.0, params={"hot": False, "bland": 2.0}
)
cfg = MyRewardManagerAnnotatedCfg()
rew_man_from_annotated_cfg = RewardManager(cfg, self.env)
# check equivalence
# parsed terms
self.assertEqual(rew_man_from_dict.active_terms, rew_man_from_annotated_cfg.active_terms)
self.assertEqual(rew_man_from_cfg.active_terms, rew_man_from_annotated_cfg.active_terms)
self.assertEqual(rew_man_from_dict.active_terms, rew_man_from_cfg.active_terms)
# parsed term configs
self.assertEqual(rew_man_from_dict._term_cfgs, rew_man_from_annotated_cfg._term_cfgs)
self.assertEqual(rew_man_from_cfg._term_cfgs, rew_man_from_annotated_cfg._term_cfgs)
self.assertEqual(rew_man_from_dict._term_cfgs, rew_man_from_cfg._term_cfgs)
def test_config_terms(self):
"""Test the ignoring of terms with zero weight."""
cfg = {
"term_1": RewardTermCfg(func=grilled_chicken, weight=10),
"term_2": RewardTermCfg(func=grilled_chicken_with_curry, weight=0.0, params={"hot": False}),
}
self.rew_man = RewardManager(cfg, self.env)
self.assertEqual(self.rew_man.active_terms, ["term_1"])
def test_compute(self):
"""Test the computation of reward."""
cfg = {
"term_1": RewardTermCfg(func=grilled_chicken, weight=10),
"term_2": RewardTermCfg(func=grilled_chicken_with_curry, weight=0.0, params={"hot": False}),
}
self.rew_man = RewardManager(cfg, self.env)
# compute expected reward
expected_reward = cfg["term_1"].weight * self.env.dt
# compute reward using manager
rewards = self.rew_man.compute()
# check the reward for environment index 0
self.assertEqual(float(rewards[0]), expected_reward)
self.assertEqual(tuple(rewards.shape), (self.env.num_envs,))
def test_active_terms(self):
"""Test the ignoring of terms with zero weight."""
cfg = {
"term_1": RewardTermCfg(func=grilled_chicken, weight=10),
"term_2": RewardTermCfg(func=grilled_chicken_with_bbq, weight=5, params={"bbq": True}),
"term_3": RewardTermCfg(func=grilled_chicken_with_curry, weight=0.0, params={"hot": False}),
}
self.rew_man = RewardManager(cfg, self.env)
self.assertEqual(len(self.rew_man.active_terms), 2)
def test_missing_weight(self):
"""Test the missing of weight in the config."""
# TODO: The error should be raised during the config parsing, not during the reward manager creation.
cfg = {
"term_1": RewardTermCfg(func=grilled_chicken, weight=10),
"term_2": RewardTermCfg(func=grilled_chicken_with_bbq, params={"bbq": True}),
}
with self.assertRaises(TypeError):
self.rew_man = RewardManager(cfg, self.env)
def test_invalid_reward_func_module(self):
"""Test the handling of invalid reward function's module in string representation."""
cfg = {
"term_1": RewardTermCfg(func=grilled_chicken, weight=10),
"term_2": RewardTermCfg(func=grilled_chicken_with_bbq, weight=5, params={"bbq": True}),
"term_3": RewardTermCfg(func="a:grilled_chicken_with_no_bbq", weight=0.1, params={"hot": False}),
}
with self.assertRaises(ValueError):
self.rew_man = RewardManager(cfg, self.env)
def test_invalid_reward_config(self):
"""Test the handling of invalid reward function's config parameters."""
cfg = {
"term_1": RewardTermCfg(func=grilled_chicken_with_bbq, weight=0.1, params={"hot": False}),
"term_2": RewardTermCfg(func=grilled_chicken_with_yoghurt, weight=2.0, params={"hot": False}),
}
with self.assertRaises(ValueError):
self.rew_man = RewardManager(cfg, self.env)
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import torch
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .velocity.locomotion_env import LocomotionEnv
def base_lin_vel(env: "LocomotionEnv"):
"""Base linear velocity in base frame."""
return env.robot.data.root_lin_vel_b
def base_ang_vel(env: "LocomotionEnv"):
"""Base angular velocity in base frame."""
return env.robot.data.root_ang_vel_b
def projected_gravity(env: "LocomotionEnv"):
"""Gravity projection on base frame."""
return env.robot.data.projected_gravity_b
def velocity_commands(env: "LocomotionEnv"):
"""Desired base velocity for the robot."""
return env._command_manager.command
def dof_pos(env: "LocomotionEnv"):
"""DOF positions for legs offset by the drive default positions."""
return env.robot.data.dof_pos - env.robot.data.actuator_pos_offset
def dof_vel(env: "LocomotionEnv"):
"""DOF velocity of the legs."""
return env.robot.data.dof_vel - env.robot.data.actuator_vel_offset
def actions(env: "LocomotionEnv"):
"""Last actions provided to env."""
return env.actions
def height_scan(env: "LocomotionEnv", sensor_name: str):
"""Height scan around the robot."""
sensor = env.sensors[sensor_name]
# TODO Remove hardcoded nan to num value
hit_height = torch.nan_to_num(sensor.data.ray_hits_w[..., 2], posinf=-1.0)
heights = env.robot.data.root_state_w[:, 2].unsqueeze(1) - 0.5 - hit_height
return heights
...@@ -16,10 +16,10 @@ from omni.isaac.core.utils.types import DynamicsViewState ...@@ -16,10 +16,10 @@ from omni.isaac.core.utils.types import DynamicsViewState
import omni.isaac.orbit.utils.kit as kit_utils import omni.isaac.orbit.utils.kit as kit_utils
from omni.isaac.orbit.compat.markers import PointMarker, StaticMarker from omni.isaac.orbit.compat.markers import PointMarker, StaticMarker
from omni.isaac.orbit.compat.utils.mdp import ObservationManager, RewardManager
from omni.isaac.orbit.robots.legged_robot import LeggedRobot from omni.isaac.orbit.robots.legged_robot import LeggedRobot
from omni.isaac.orbit.utils.dict import class_to_dict from omni.isaac.orbit.utils.dict import class_to_dict
from omni.isaac.orbit.utils.math import quat_apply, quat_from_euler_xyz, sample_uniform, wrap_to_pi from omni.isaac.orbit.utils.math import quat_apply, quat_from_euler_xyz, sample_uniform, wrap_to_pi
from omni.isaac.orbit.utils.mdp import ObservationManager, RewardManager
from omni.isaac.orbit_envs.isaac_env import IsaacEnv, VecEnvIndices, VecEnvObs from omni.isaac.orbit_envs.isaac_env import IsaacEnv, VecEnvIndices, VecEnvObs
......
...@@ -12,12 +12,12 @@ import omni.isaac.core.utils.prims as prim_utils ...@@ -12,12 +12,12 @@ import omni.isaac.core.utils.prims as prim_utils
import omni.isaac.orbit.utils.kit as kit_utils import omni.isaac.orbit.utils.kit as kit_utils
from omni.isaac.orbit.compat.markers import StaticMarker from omni.isaac.orbit.compat.markers import StaticMarker
from omni.isaac.orbit.compat.utils.mdp import ObservationManager, RewardManager
from omni.isaac.orbit.controllers.differential_inverse_kinematics import DifferentialInverseKinematics from omni.isaac.orbit.controllers.differential_inverse_kinematics import DifferentialInverseKinematics
from omni.isaac.orbit.objects import RigidObject from omni.isaac.orbit.objects import RigidObject
from omni.isaac.orbit.robots.single_arm import SingleArmManipulator from omni.isaac.orbit.robots.single_arm import SingleArmManipulator
from omni.isaac.orbit.utils.dict import class_to_dict from omni.isaac.orbit.utils.dict import class_to_dict
from omni.isaac.orbit.utils.math import quat_inv, quat_mul, random_orientation, sample_uniform, scale_transform from omni.isaac.orbit.utils.math import quat_inv, quat_mul, random_orientation, sample_uniform, scale_transform
from omni.isaac.orbit.utils.mdp import ObservationManager, RewardManager
from omni.isaac.orbit_envs.isaac_env import IsaacEnv, VecEnvIndices, VecEnvObs from omni.isaac.orbit_envs.isaac_env import IsaacEnv, VecEnvIndices, VecEnvObs
......
...@@ -11,11 +11,11 @@ import omni.isaac.core.utils.prims as prim_utils ...@@ -11,11 +11,11 @@ import omni.isaac.core.utils.prims as prim_utils
import omni.isaac.orbit.utils.kit as kit_utils import omni.isaac.orbit.utils.kit as kit_utils
from omni.isaac.orbit.compat.markers import PointMarker, StaticMarker from omni.isaac.orbit.compat.markers import PointMarker, StaticMarker
from omni.isaac.orbit.compat.utils.mdp import ObservationManager, RewardManager
from omni.isaac.orbit.controllers.differential_inverse_kinematics import DifferentialInverseKinematics from omni.isaac.orbit.controllers.differential_inverse_kinematics import DifferentialInverseKinematics
from omni.isaac.orbit.robots.single_arm import SingleArmManipulator from omni.isaac.orbit.robots.single_arm import SingleArmManipulator
from omni.isaac.orbit.utils.dict import class_to_dict from omni.isaac.orbit.utils.dict import class_to_dict
from omni.isaac.orbit.utils.math import random_orientation, sample_uniform, scale_transform from omni.isaac.orbit.utils.math import random_orientation, sample_uniform, scale_transform
from omni.isaac.orbit.utils.mdp import ObservationManager, RewardManager
from omni.isaac.orbit_envs.isaac_env import IsaacEnv, VecEnvIndices, VecEnvObs from omni.isaac.orbit_envs.isaac_env import IsaacEnv, VecEnvIndices, VecEnvObs
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment