Unverified Commit e28eeb74 authored by Mayank Mittal's avatar Mayank Mittal Committed by GitHub

Cleans up the buffer-related implementations (#562)

* Adds a lot of missing docstrings related to buffers and new actuator
models
* Nitpicks clean ups to make it simpler to read
* Adds unit test for the circular buffer class

## Type of change

- Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- This change requires a documentation update

## Checklist

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./isaaclab.sh --format`
- [x] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] I have run all the tests with `./isaaclab.sh --test` and they pass
- [x] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [x] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there
parent 0154535e
......@@ -15,6 +15,10 @@
IdealPDActuatorCfg
DCMotor
DCMotorCfg
DelayedPDActuator
DelayedPDActuatorCfg
RemotizedPDActuator
RemotizedPDActuatorCfg
ActuatorNetMLP
ActuatorNetMLPCfg
ActuatorNetLSTM
......@@ -74,6 +78,34 @@ DC Motor Actuator
:show-inheritance:
:exclude-members: __init__, class_type
Delayed PD Actuator
-------------------
.. autoclass:: DelayedPDActuator
:members:
:inherited-members:
:show-inheritance:
.. autoclass:: DelayedPDActuatorCfg
:members:
:inherited-members:
:show-inheritance:
:exclude-members: __init__, class_type
Remotized PD Actuator
---------------------
.. autoclass:: RemotizedPDActuator
:members:
:inherited-members:
:show-inheritance:
.. autoclass:: RemotizedPDActuatorCfg
:members:
:inherited-members:
:show-inheritance:
:exclude-members: __init__, class_type
MLP Network Actuator
---------------------
......@@ -88,7 +120,6 @@ MLP Network Actuator
:show-inheritance:
:exclude-members: __init__, class_type
LSTM Network Actuator
---------------------
......
......@@ -10,7 +10,9 @@
io
array
assets
buffers
dict
interpolation
math
noise
string
......@@ -52,6 +54,15 @@ Asset operations
:members:
:show-inheritance:
Buffer operations
~~~~~~~~~~~~~~~~~
.. automodule:: omni.isaac.lab.utils.buffers
:members:
:imported-members:
:inherited-members:
:show-inheritance:
Dictionary operations
~~~~~~~~~~~~~~~~~~~~~
......@@ -59,6 +70,15 @@ Dictionary operations
:members:
:show-inheritance:
Interpolation operations
~~~~~~~~~~~~~~~~~~~~~~~~
.. automodule:: omni.isaac.lab.utils.interpolation
:members:
:imported-members:
:inherited-members:
:show-inheritance:
Math operations
~~~~~~~~~~~~~~~
......
[package]
# Note: Semantic Versioning is used: https://semver.org/
version = "0.18.2"
version = "0.18.3"
# Description
title = "Isaac Lab framework for Robot Learning"
......
Changelog
---------
0.18.3 (2024-06-25)
~~~~~~~~~~~~~~~~~~~
Fixed
^^^^^
* Fixed the docstrings at multiple places related to the different buffer implementations inside the
:mod:`omni.isaac.lab.utils.buffers` module. The docstrings were not clear and did not provide enough
information about the classes and their methods.
Added
^^^^^
* Added the field for fixed tendom names in the :class:`omni.isaac.lab.assets.ArticulationData` class.
Earlier, this information was not exposed which was inconsistent with other name related information
such as joint or body names.
Changed
^^^^^^^
* Renamed the fields ``min_num_time_lags`` and ``max_num_time_lags`` to ``min_delay`` and
``max_delay`` in the :class:`omni.isaac.lab.actuators.DelayedPDActuatorCfg` class. This is to make
the naming simpler to understand.
0.18.2 (2024-06-25)
~~~~~~~~~~~~~~~~~~~
......
......@@ -162,18 +162,11 @@ class DelayedPDActuatorCfg(IdealPDActuatorCfg):
class_type: type = actuator_pd.DelayedPDActuator
min_num_time_lags: int = 0
"""Minimum number of physics time-steps that the actuator command may be delayed."""
min_delay: int = 0
"""Minimum number of physics time-steps with which the actuator command may be delayed. Defaults to 0."""
max_num_time_lags: int = 0
"""Maximum number of physics time-steps that the actuator command may be delayed."""
num_time_lags: int = 0
"""The number of physics time-steps that the actuator command will be delayed.
Note:
This values cannot be greater than `max_num_time_lags`.
"""
max_delay: int = 0
"""Maximum number of physics time-steps with which the actuator command may be delayed. Defaults to 0."""
@configclass
......@@ -182,7 +175,8 @@ class RemotizedPDActuatorCfg(DelayedPDActuatorCfg):
Note:
The torque output limits for this actuator is derived from a linear interpolation of a lookup table
in :attr:`joint_parameter_lookup` describing the relationship between joint angles and the output torques.
in :attr:`joint_parameter_lookup`. This table describes the relationship between joint angles and
the output torques.
"""
class_type: type = actuator_pd.RemotizedPDActuator
......@@ -190,6 +184,6 @@ class RemotizedPDActuatorCfg(DelayedPDActuatorCfg):
joint_parameter_lookup: torch.Tensor = MISSING
"""Joint parameter lookup table. Shape is (num_lookup_points, 3).
The tensor describes relationship between the joint angle (rad), the transmission ratio (in/out),
and the output torque (N*m).
This tensor describes the relationship between the joint angle (rad), the transmission ratio (in/out),
and the output torque (N*m). The table is used to interpolate the output torque based on the joint angle.
"""
......@@ -132,8 +132,7 @@ class IdealPDActuator(ActuatorBase):
class DCMotor(IdealPDActuator):
r"""
Direct control (DC) motor actuator model with velocity-based saturation model.
r"""Direct control (DC) motor actuator model with velocity-based saturation model.
It uses the same model as the :class:`IdealActuator` for computing the torques from input commands.
However, it implements a saturation model defined by DC motor characteristics.
......@@ -221,65 +220,49 @@ class DCMotor(IdealPDActuator):
class DelayedPDActuator(IdealPDActuator):
"""Ideal PD actuator with delayed data.
"""Ideal PD actuator with delayed command application.
The DelayedPDActuator has configurable minimum and maximum time lag values, which are used to initialize a
DelayBuffer to hold a queue of pending actuator commands. On reset, a value time_lags will be randomly sampled
from the min and max time lag bounds. At every physics step, the most recent actuation value is pushed to the
DelayBuffer, but the final actuation value applied to simulation will be `time_lags` physics steps in the past.
This class extends the :class:`IdealPDActuator` class by adding a delay to the actuator commands. The delay
is implemented using a circular buffer that stores the actuator commands for a certain number of physics steps.
The most recent actuation value is pushed to the buffer at every physics step, but the final actuation value
applied to the simulation is lagged by a certain number of physics steps.
The amount of time lag is configurable and can be set to a random value between the minimum and maximum time
lag bounds at every reset. The minimum and maximum time lag values are set in the configuration instance passed
to the class.
"""
def __init__(
self,
cfg: DelayedPDActuatorCfg,
joint_names: list[str],
joint_ids: Sequence[int],
num_envs: int,
device: str,
stiffness: torch.Tensor | float = 0.0,
damping: torch.Tensor | float = 0.0,
armature: torch.Tensor | float = 0.0,
friction: torch.Tensor | float = 0.0,
effort_limit: torch.Tensor | float = torch.inf,
velocity_limit: torch.Tensor | float = torch.inf,
):
super().__init__(
cfg,
joint_names,
joint_ids,
num_envs,
device,
stiffness,
damping,
armature,
friction,
effort_limit,
velocity_limit,
)
cfg: DelayedPDActuatorCfg
"""The configuration for the actuator model."""
def __init__(self, cfg: DelayedPDActuatorCfg, *args, **kwargs):
super().__init__(cfg, *args, **kwargs)
# instantiate the delay buffers
self.positions_delay_buffer = DelayBuffer(cfg.max_num_time_lags, num_envs=num_envs, device=device)
self.velocities_delay_buffer = DelayBuffer(cfg.max_num_time_lags, num_envs=num_envs, device=device)
self.efforts_delay_buffer = DelayBuffer(cfg.max_num_time_lags, num_envs=num_envs, device=device)
self.positions_delay_buffer = DelayBuffer(cfg.max_delay, self._num_envs, device=self._device)
self.velocities_delay_buffer = DelayBuffer(cfg.max_delay, self._num_envs, device=self._device)
self.efforts_delay_buffer = DelayBuffer(cfg.max_delay, self._num_envs, device=self._device)
# all of the envs
self._ALL_INDICES = torch.arange(num_envs, dtype=torch.long, device=device)
self._ALL_INDICES = torch.arange(self._num_envs, dtype=torch.long, device=self._device)
def reset(self, env_ids: Sequence[int]):
super().reset(env_ids)
# number of environments (since env_ids can be a slice)
env_size = self._ALL_INDICES[env_ids].size()
if env_ids is None or env_ids == slice(None):
num_envs = self._num_envs
else:
num_envs = len(env_ids)
# set a new random delay for environments in env_ids
time_lags = self.positions_delay_buffer.time_lags
time_lags[env_ids] = torch.randint(
low=self.cfg.min_num_time_lags,
high=self.cfg.max_num_time_lags + 1,
size=env_size,
device=self._device,
time_lags = torch.randint(
low=self.cfg.min_delay,
high=self.cfg.max_delay + 1,
size=(num_envs,),
dtype=torch.int,
device=self._device,
)
# set delays
self.positions_delay_buffer.set_time_lag(time_lags)
self.velocities_delay_buffer.set_time_lag(time_lags)
self.efforts_delay_buffer.set_time_lag(time_lags)
self.positions_delay_buffer.set_time_lag(time_lags, env_ids)
self.velocities_delay_buffer.set_time_lag(time_lags, env_ids)
self.efforts_delay_buffer.set_time_lag(time_lags, env_ids)
# reset buffers
self.positions_delay_buffer.reset(env_ids)
self.velocities_delay_buffer.reset(env_ids)
......@@ -297,10 +280,13 @@ class DelayedPDActuator(IdealPDActuator):
class RemotizedPDActuator(DelayedPDActuator):
"""Ideal PD actuator with angle dependent torque limits.
"""Ideal PD actuator with angle-dependent torque limits.
This class extends the :class:`DelayedPDActuator` class by adding angle-dependent torque limits to the actuator.
The torque limits are applied by querying a lookup table describing the relationship between the joint angle
and the maximum output torque. The lookup table is provided in the configuration instance passed to the class.
The torque limits for this actuator are applied by querying a lookup table describing the relationship between
the joint angle and the maximum output torque.
The torque limits are interpolated based on the current joint positions and applied to the actuator commands.
"""
def __init__(
......@@ -328,6 +314,10 @@ class RemotizedPDActuator(DelayedPDActuator):
# define remotized joint torque limit
self._torque_limit = LinearInterpolation(self.angle_samples, self.max_torque_samples, device=device)
"""
Properties.
"""
@property
def angle_samples(self) -> torch.Tensor:
return self._joint_parameter_lookup[:, 0]
......@@ -340,6 +330,10 @@ class RemotizedPDActuator(DelayedPDActuator):
def max_torque_samples(self) -> torch.Tensor:
return self._joint_parameter_lookup[:, 2]
"""
Operations.
"""
def compute(
self, control_action: ArticulationActions, joint_pos: torch.Tensor, joint_vel: torch.Tensor
) -> ArticulationActions:
......
......@@ -131,6 +131,11 @@ class Articulation(RigidObject):
"""Ordered names of joints in articulation."""
return self.root_physx_view.shared_metatype.dof_names
@property
def fixed_tendon_names(self) -> list[str]:
"""Ordered names of fixed tendons in articulation."""
return self._fixed_tendon_names
@property
def body_names(self) -> list[str]:
"""Ordered names of bodies in articulation."""
......@@ -234,9 +239,10 @@ class Articulation(RigidObject):
on the name matching.
Args:
name_keys: A regular expression or a list of regular expressions to match the joint names with fixed tendons.
tendon_subsets: A subset of joints with fixed tendons to search for. Defaults to None, which means all joints
in the articulation are searched.
name_keys: A regular expression or a list of regular expressions to match the joint
names with fixed tendons.
tendon_subsets: A subset of joints with fixed tendons to search for. Defaults to None, which means
all joints in the articulation are searched.
preserve_order: Whether to preserve the order of the name keys in the output. Defaults to False.
Returns:
......@@ -579,6 +585,10 @@ class Articulation(RigidObject):
# set targets
self._data.joint_effort_target[env_ids, joint_ids] = target
"""
Operations - Tendons.
"""
def set_fixed_tendon_stiffness(
self,
stiffness: torch.Tensor,
......@@ -1042,17 +1052,23 @@ class Articulation(RigidObject):
def _process_fixed_tendons(self):
"""Process fixed tendons."""
self.fixed_tendon_names = list()
# create a list to store the fixed tendon names
self._fixed_tendon_names = list()
# parse fixed tendons properties if they exist
if self.num_fixed_tendons > 0:
stage = stage_utils.get_current_stage()
# iterate over all joints to find tendons attached to them
for j in range(self.num_joints):
usd_joint_path = self.root_physx_view.dof_paths[0][j]
# check whether joint has tendons - tendon name follows the joint name it is attached to
joint = UsdPhysics.Joint.Get(stage, usd_joint_path)
if joint.GetPrim().HasAPI(PhysxSchema.PhysxTendonAxisRootAPI):
joint_name = usd_joint_path.split("/")[-1]
self.fixed_tendon_names.append(joint_name)
self._fixed_tendon_names.append(joint_name)
self._data.fixed_tendon_names = self._fixed_tendon_names
self._data.default_fixed_tendon_stiffness = self.root_physx_view.get_fixed_tendon_stiffnesses().clone()
self._data.default_fixed_tendon_damping = self.root_physx_view.get_fixed_tendon_dampings().clone()
self._data.default_fixed_tendon_limit_stiffness = (
......
......@@ -14,28 +14,44 @@ from ..rigid_object import RigidObjectData
class ArticulationData(RigidObjectData):
"""Data container for an articulation."""
"""Data container for an articulation.
def __init__(self, root_physx_view: physx.ArticulationView, device):
super().__init__(root_physx_view, device)
self._root_physx_view: physx.ArticulationView = root_physx_view
This class extends the :class:`RigidObjectData` class to provide additional data for
an articulation mainly related to the joints and tendons.
"""
_root_physx_view: physx.ArticulationView
"""The root articulation view of the object."""
def __init__(self, root_physx_view: physx.ArticulationView, device: str):
# Initialize the parent class
super().__init__(root_physx_view, device) # type: ignore
# Initialize history for finite differencing
self._previous_joint_vel = self._root_physx_view.get_dof_velocities().clone()
# Initialize the lazy buffers.
self._body_state_w: TimestampedBuffer = TimestampedBuffer()
self._joint_pos: TimestampedBuffer = TimestampedBuffer()
self._joint_acc: TimestampedBuffer = TimestampedBuffer()
self._joint_vel: TimestampedBuffer = TimestampedBuffer()
self._body_state_w = TimestampedBuffer()
self._joint_pos = TimestampedBuffer()
self._joint_acc = TimestampedBuffer()
self._joint_vel = TimestampedBuffer()
def update(self, dt: float):
self._time_stamp += dt
# Trigger an update of the joint acceleration buffer at a higher frequency since we do finite differencing.
self._sim_timestamp += dt
# Trigger an update of the joint acceleration buffer at a higher frequency
# since we do finite differencing.
self.joint_acc
##
# Names.
##
joint_names: list[str] = None
"""Joint names in the order parsed by the simulation view."""
fixed_tendon_names: list[str] = None
"""Fixed tendon names in the order parsed by the simulation view."""
##
# Defaults.
##
......@@ -191,32 +207,37 @@ class ArticulationData(RigidObjectData):
@property
def root_state_w(self):
"""Root state ``[pos, quat, lin_vel, ang_vel]`` in simulation world frame. Shape is (num_instances, 13)."""
if self._root_state_w.update_timestamp < self._time_stamp:
if self._root_state_w.timestamp < self._sim_timestamp:
# read data from simulation
pose = self._root_physx_view.get_root_transforms().clone()
pose[:, 3:7] = math_utils.convert_quat(pose[:, 3:7], to="wxyz")
velocity = self._root_physx_view.get_root_velocities()
# set the buffer data and timestamp
self._root_state_w.data = torch.cat((pose, velocity), dim=-1)
self._root_state_w.update_timestamp = self._time_stamp
self._root_state_w.timestamp = self._sim_timestamp
return self._root_state_w.data
@property
def body_state_w(self):
"""State of all bodies `[pos, quat, lin_vel, ang_vel]` in simulation world frame.
Shape is (num_instances, num_bodies, 13)."""
if self._body_state_w.update_timestamp < self._time_stamp:
if self._body_state_w.timestamp < self._sim_timestamp:
# read data from simulation
poses = self._root_physx_view.get_link_transforms().clone()
poses[..., 3:7] = math_utils.convert_quat(poses[..., 3:7], to="wxyz")
velocities = self._root_physx_view.get_link_velocities()
# set the buffer data and timestamp
self._body_state_w.data = torch.cat((poses, velocities), dim=-1)
self._body_state_w.update_timestamp = self._time_stamp
self._body_state_w.timestamp = self._sim_timestamp
return self._body_state_w.data
@property
def body_acc_w(self):
"""Acceleration of all bodies. Shape is (num_instances, num_bodies, 6)."""
if self._body_acc_w.update_timestamp < self._time_stamp:
if self._body_acc_w.timestamp < self._sim_timestamp:
# read data from simulation and set the buffer data and timestamp
self._body_acc_w.data = self._root_physx_view.get_link_accelerations()
self._body_acc_w.update_timestamp = self._time_stamp
self._body_acc_w.timestamp = self._sim_timestamp
return self._body_acc_w.data
@property
......@@ -232,26 +253,30 @@ class ArticulationData(RigidObjectData):
@property
def joint_pos(self):
"""Joint positions of all joints. Shape is (num_instances, num_joints)."""
if self._joint_pos.update_timestamp < self._time_stamp:
if self._joint_pos.timestamp < self._sim_timestamp:
# read data from simulation and set the buffer data and timestamp
self._joint_pos.data = self._root_physx_view.get_dof_positions()
self._joint_pos.update_timestamp = self._time_stamp
self._joint_pos.timestamp = self._sim_timestamp
return self._joint_pos.data
@property
def joint_vel(self):
"""Joint velocities of all joints. Shape is (num_instances, num_joints)."""
if self._joint_vel.update_timestamp < self._time_stamp:
if self._joint_vel.timestamp < self._sim_timestamp:
# read data from simulation and set the buffer data and timestamp
self._joint_vel.data = self._root_physx_view.get_dof_velocities()
self._joint_vel.update_timestamp = self._time_stamp
self._joint_vel.timestamp = self._sim_timestamp
return self._joint_vel.data
@property
def joint_acc(self):
"""Joint acceleration of all joints. Shape is (num_instances, num_joints)."""
if self._joint_acc.update_timestamp < self._time_stamp:
if self._joint_acc.timestamp < self._sim_timestamp:
# note: we use finite differencing to compute acceleration
self._joint_acc.data = (self.joint_vel - self._previous_joint_vel) / (
self._time_stamp - self._joint_acc.update_timestamp
self._sim_timestamp - self._joint_acc.timestamp
)
self._joint_acc.timestamp = self._sim_timestamp
# update the previous joint velocity
self._previous_joint_vel[:] = self.joint_vel
self._joint_acc.update_timestamp = self._time_stamp
return self._joint_acc.data
......@@ -12,26 +12,59 @@ from omni.isaac.lab.utils.buffers import TimestampedBuffer
class RigidObjectData:
"""Data container for a rigid object."""
"""Data container for a rigid object.
def __init__(self, root_physx_view: physx.RigidBodyView, device):
This class contains the data for a rigid object in the simulation. The data includes the state of
the root rigid body and the state of all the bodies in the object. The data is stored in the simulation
world frame unless otherwise specified.
The data is lazily updated, meaning that the data is only updated when it is accessed. This is useful
when the data is expensive to compute or retrieve. The data is updated when the timestamp of the buffer
is older than the current simulation timestamp. The timestamp is updated whenever the data is updated.
"""
_root_physx_view: physx.RigidBodyView
"""The root rigid body view of the object."""
def __init__(self, root_physx_view: physx.RigidBodyView, device: str):
"""Initializes the rigid object data.
Args:
root_physx_view: The root rigid body view of the object.
device: The device used for processing.
"""
# Set the parameters
self.device = device
self._time_stamp = 0.0
self._root_physx_view: physx.RigidBodyView = root_physx_view
self._root_physx_view = root_physx_view
# Set initial time stamp
self._sim_timestamp = 0.0
self.gravity_vec_w = torch.tensor((0.0, 0.0, -1.0), device=self.device).repeat(self._root_physx_view.count, 1)
self.forward_vec_b = torch.tensor((1.0, 0.0, 0.0), device=self.device).repeat(self._root_physx_view.count, 1)
# Initialize constants
self.GRAVITY_VEC_W = torch.tensor((0.0, 0.0, -1.0), device=self.device).repeat(self._root_physx_view.count, 1)
self.FORWARD_VEC_B = torch.tensor((1.0, 0.0, 0.0), device=self.device).repeat(self._root_physx_view.count, 1)
# Initialize buffers for finite differencing
self._previous_body_vel_w = torch.zeros((self._root_physx_view.count, 1, 6), device=self.device)
# Initialize the lazy buffers.
self._root_state_w: TimestampedBuffer = TimestampedBuffer()
self._body_acc_w: TimestampedBuffer = TimestampedBuffer()
self._root_state_w = TimestampedBuffer()
self._body_acc_w = TimestampedBuffer()
def update(self, dt: float):
self._time_stamp += dt
# Trigger an update of the body acceleration buffer at a higher frequency since we do finite differencing.
"""Updates the data for the rigid object.
Args:
dt: The time step for the update. This must be a positive value.
"""
self._sim_timestamp += dt
# Trigger an update of the body acceleration buffer at a higher frequency
# since we do finite differencing.
self.body_acc_w
##
# Names.
##
body_names: list[str] = None
"""Body names in the order parsed by the simulation view."""
......@@ -52,12 +85,14 @@ class RigidObjectData:
@property
def root_state_w(self):
"""Root state ``[pos, quat, lin_vel, ang_vel]`` in simulation world frame. Shape is (num_instances, 13)."""
if self._root_state_w.update_timestamp < self._time_stamp:
if self._root_state_w.timestamp < self._sim_timestamp:
# read data from simulation
pose = self._root_physx_view.get_transforms().clone()
pose[:, 3:7] = math_utils.convert_quat(pose[:, 3:7], to="wxyz")
velocity = self._root_physx_view.get_velocities()
# set the buffer data and timestamp
self._root_state_w.data = torch.cat((pose, velocity), dim=-1)
self._root_state_w.update_timestamp = self._time_stamp
self._root_state_w.timestamp = self._sim_timestamp
return self._root_state_w.data
@property
......@@ -68,18 +103,25 @@ class RigidObjectData:
@property
def body_acc_w(self):
"""Acceleration of all bodies. Shape is (num_instances, 1, 6)."""
if self._body_acc_w.update_timestamp < self._time_stamp:
if self._body_acc_w.timestamp < self._sim_timestamp:
# note: we use finite differencing to compute acceleration
self._body_acc_w.data = (self.body_vel_w - self._previous_body_vel_w) / (
self._time_stamp - self._body_acc_w.update_timestamp
self._sim_timestamp - self._body_acc_w.timestamp
)
self._body_acc_w.timestamp = self._sim_timestamp
# update the previous velocity
self._previous_body_vel_w[:] = self.body_vel_w
self._body_acc_w.update_timestamp = self._time_stamp
return self._body_acc_w.data
@property
def projected_gravity_b(self):
"""Projection of the gravity direction on base frame. Shape is (num_instances, 3)."""
return math_utils.quat_rotate_inverse(self.root_quat_w, self.gravity_vec_w)
"""Projection of the gravity direction on base frame. Shape is (num_instances, 3).
Note:
This quantity is computed by assuming that the gravity direction is along the z-direction,
i.e. :math:`(0, 0, -1)`.
"""
return math_utils.quat_rotate_inverse(self.root_quat_w, self.GRAVITY_VEC_W)
@property
def heading_w(self):
......@@ -89,7 +131,7 @@ class RigidObjectData:
This quantity is computed by assuming that the forward-direction of the base
frame is along x-direction, i.e. :math:`(1, 0, 0)`.
"""
forward_w = math_utils.quat_apply(self.root_quat_w, self.forward_vec_b)
forward_w = math_utils.quat_apply(self.root_quat_w, self.FORWARD_VEC_B)
return torch.atan2(forward_w[:, 1], forward_w[:, 0])
@property
......
......@@ -9,6 +9,6 @@ from .array import *
from .buffers import *
from .configclass import configclass
from .dict import *
from .linear_interpolation import *
from .interpolation import *
from .string import *
from .timer import Timer
......@@ -5,6 +5,6 @@
"""Sub-module containing different buffers."""
from .circular_buffer import BatchedCircularBuffer
from .circular_buffer import CircularBuffer
from .delay_buffer import DelayBuffer
from .timestamped_buffer import TimestampedBuffer
......@@ -3,99 +3,147 @@
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import torch
from collections.abc import Sequence
class BatchedCircularBuffer:
"""Circular buffer for storing a history of batched tensor data."""
class CircularBuffer:
"""Circular buffer for storing a history of batched tensor data.
This class implements a circular buffer for storing a history of batched tensor data. The buffer is
initialized with a maximum length and a batch size. The data is stored in a circular fashion, and the
data can be retrieved in a LIFO (Last-In-First-Out) fashion. The buffer is designed to be used in
multi-environment settings, where each environment has its own data.
The shape of the appended data is expected to be (batch_size, ...), where the first dimension is the
batch dimension. Correspondingly, the shape of the ring buffer is (max_len, batch_size, ...).
"""
def __init__(self, max_len: int, batch_size: int, device: str):
"""Initialize the circular buffer.
Args:
max_len: The maximum length of the circular buffer. The minimum value is one.
max_len: The maximum length of the circular buffer. The minimum allowed value is 1.
batch_size: The batch dimension of the data.
device: Device used for processing.
device: The device used for processing.
Raises:
ValueError: If the buffer size is less than one.
"""
if max_len < 1:
raise ValueError(f"The buffer size should be greater than zero. However, it is set to {max_len}!")
self._max_len = max_len
# set the parameters
self._batch_size = batch_size
self._device = device
self._ALL_INDICES = torch.arange(batch_size, device=device)
# max length tensor for comparisons
self._max_len = torch.full((batch_size,), max_len, dtype=torch.int, device=device)
# number of data pushes passed since the last call to :meth:`reset`
self._num_pushes = torch.zeros(batch_size, dtype=torch.long, device=device)
# the pointer to the current head of the circular buffer (-1 means not initialized)
self._pointer: int = -1
# the circular buffer for data storage
self._buffer: torch.Tensor | None = None # the data buffer
# the actual buffer for data storage
# note: this is initialized on the first call to :meth:`append`
self._buffer: torch.Tensor = None # type: ignore
"""
Properties.
"""
@property
def batch_size(self) -> int:
"""The batch size of the ring buffer."""
return self._batch_size
@property
def device(self) -> str:
"""The device used for processing."""
return self._device
@property
def max_length(self) -> int:
"""The maximum length of the ring buffer."""
return int(self._max_len[0].item())
@property
def current_length(self) -> torch.Tensor:
"""The current length of the buffer. Shape is (batch_size,).
Since the buffer is circular, the current length is the minimum of the number of pushes
and the maximum length.
"""
return torch.minimum(self._num_pushes, self._max_len)
"""
Operations.
"""
def reset(self, batch_ids: Sequence[int] | None = None):
"""Reset the circular buffer.
"""Reset the circular buffer at the specified batch indices.
Args:
batch_ids: Elements to reset in the batch dimension.
batch_ids: Elements to reset in the batch dimension. Default is None, which resets all the batch indices.
"""
# resolve all indices
if batch_ids is None:
batch_ids = self._ALL_INDICES
batch_ids = slice(None)
# reset the number of pushes for the specified batch indices
# note: we don't need to reset the buffer since it will be overwritten. The pointer handles this.
self._num_pushes[batch_ids] = 0
def append(self, data: torch.Tensor):
"""Append the data to the circular buffer.
Args:
data: The data to be appended, where `len(data) == self.batch_size`.
data: The data to append to the circular buffer. The first dimension should be the batch dimension.
Shape is (batch_size, ...).
Raises:
ValueError: If the input data has a different batch size than the buffer.
"""
# check the batch size
if data.shape[0] != self.batch_size:
raise ValueError(f"The input data has {data.shape[0]} environments while expecting {self.batch_size}")
# at the fist call, initialize the buffer
if self._buffer is None:
self._pointer = -1
self._buffer = torch.empty((self.max_len, *data.shape), dtype=data.dtype, device=self._device)
self._buffer = torch.empty((self.max_length, *data.shape), dtype=data.dtype, device=self._device)
# move the head to the next slot
self._pointer = (self._pointer + 1) % self.max_len
self._pointer = (self._pointer + 1) % self.max_length
# add the new data to the last layer
self._buffer[self._pointer] = data
self._buffer[self._pointer] = data.to(self._device)
# increment number of number of pushes
self._num_pushes += 1
def __getitem__(self, key: torch.Tensor) -> torch.Tensor:
"""Get the data from the circular buffer in LIFO fashion.
"""Retrieve the data from the circular buffer in last-in-first-out (LIFO) fashion.
If the requested index is larger than the number of pushes since the last call to :meth:`reset`,
the oldest stored data is returned.
Args:
key: The index of the data to be retrieved. It can be a single integer or a tensor of integers.
key: The index to retrieve from the circular buffer. The index should be less than the number of pushes
since the last call to :meth:`reset`. Shape is (batch_size,).
Returns:
The data from the circular buffer. Shape is (batch_size, ...).
Raises:
ValueError: If the input key has a different batch size than the buffer.
RuntimeError: If the buffer is empty.
"""
# check the batch size
if len(key) != self.batch_size:
raise ValueError(f"The key has length {key.shape[0]} while expecting {self.batch_size}")
raise ValueError(f"The argument 'key' has length {key.shape[0]}, while expecting {self.batch_size}")
# check if the buffer is empty
if torch.any(self._num_pushes == 0) or self._buffer is None:
raise ValueError("Attempting to get data on an empty circular buffer.")
raise RuntimeError("Attempting to retrieve data on an empty circular buffer. Please append data first.")
# admissible lag
valid_keys = torch.minimum(key, self._num_pushes - 1)
# the index in the circular buffer (pointer points to the last+1 index)
index_in_buffer = torch.remainder(self._pointer - valid_keys, self.max_len)
index_in_buffer = torch.remainder(self._pointer - valid_keys, self.max_length)
# return output
return self._buffer[index_in_buffer, self._ALL_INDICES, :]
"""
Properties.
"""
@property
def batch_size(self) -> int:
"""The batch size in the ring buffer."""
return self._batch_size
@property
def device(self) -> str:
"""Device used for processing."""
return self._device
@property
def max_len(self) -> int:
"""The maximum length of the ring buffer."""
return self._max_len
return self._buffer[index_in_buffer, self._ALL_INDICES]
......@@ -9,13 +9,20 @@ from dataclasses import dataclass
@dataclass
class TimestampedBuffer:
"""Buffer to hold timestamped data.
"""A buffer class containing data and its timestamp.
Such a buffer is useful to check whether data is outdated and needs to be refreshed to create lazy buffers.
This class is a simple data container that stores a tensor and its timestamp. The timestamp is used to
track the last update of the buffer. The timestamp is set to -1.0 by default, indicating that the buffer
has not been updated yet. The timestamp should be updated whenever the data in the buffer is updated. This
way the buffer can be used to check whether the data is outdated and needs to be refreshed.
The buffer is useful for creating lazy buffers that only update the data when it is outdated. This can be
useful when the data is expensive to compute or retrieve. For example usage, refer to the data classes in
the :mod:`omni.isaac.lab.assets` module.
"""
data: torch.Tensor = None
"""Data stored in the buffer."""
data: torch.Tensor = None # type: ignore
"""The data stored in the buffer. Default is None, indicating that the buffer is empty."""
update_timestamp: float = -1.0
"""Timestamp of the last update of the buffer."""
timestamp: float = -1.0
"""Timestamp at the last update of the buffer. Default is -1.0, indicating that the buffer has not been updated."""
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Submodule for different interpolation methods.
"""
from .linear_interpolation import LinearInterpolation
......@@ -3,64 +3,83 @@
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import torch
class LinearInterpolation:
"""Linearly interpolates a sampled scalar function ``y = f(x)`` where :math:`f: R -> R`.
"""Linearly interpolates a sampled scalar function for arbitrary query points.
This class implements a linear interpolation for a scalar function. The function maps from real values, x, to
real values, y. It expects a set of samples from the function's domain, x, and the corresponding values, y.
The class allows querying the function's values at any arbitrary point.
It assumes that the function's domain, X, is sampled in an ascending order. For the query points out of
the sampling range of X, the class does a zero-order-hold extrapolation based on the boundary values.
The interpolation is done by finding the two closest points in x to the query point and then linearly
interpolating between the corresponding y values. For the query points that are outside the input points,
the class does a zero-order-hold extrapolation based on the boundary values. This means that the class
returns the value of the closest point in x.
"""
def __init__(self, x: torch.Tensor, y: torch.Tensor, device: str):
"""Initialize the Linear Interpolation.
"""Initializes the linear interpolation.
The scalar function maps from real values, x, to real values, y. The input to the class is a set of samples
from the function's domain, x, and the corresponding values, y.
The scalar function maps from real values, x, to real values, y.
Note:
The input tensor x should be sorted in ascending order.
Args:
x: An ascending vector of samples from the function's domain.
y: The function's values associated to x.
device: Device used for processing.
x: An vector of samples from the function's domain. The values should be sorted in ascending order.
Shape is (num_samples,)
y: The function's values associated to the input x. Shape is (num_samples,)
device: The device used for processing.
Raises:
ValueError: If the input tensors are empty or have different sizes.
ValueError: If the input tensor x is not sorted in ascending order.
"""
# make sure that input tensors are 1D of size (num_samples,)
self._x = x.view(-1).clone().to(device=device)
self._y = y.view(-1).clone().to(device=device)
# make sure sizes are correct
if self._x.numel() == 0:
raise ValueError("Input tensor x is empty!")
if self._x.numel() != self._y.numel():
raise ValueError("Tensor x and y should have the same size!")
raise ValueError(f"Input tensors x and y have different sizes: {self._x.numel()} != {self._y.numel()}")
# make sure that x is sorted
if torch.any(self._x[1:] < self._x[:-1]):
raise ValueError("x is not sorted!")
raise ValueError("Input tensor x is not sorted in ascending order!")
def compute(self, q: torch.Tensor) -> torch.Tensor:
"""Calculates a linearly interpolated values for the query points.
Args:
q: The query points. It can have any arbitrary shape.
Returns:
The interpolation values. It has the same shape as the input tensor.
The interpolated values at query points. It has the same shape as the input tensor.
"""
# serialized q
q_1d = q.view(-1)
# Number of elements in the x that are strictly smaller than query points (use int32 instead of int64)
num_smaller_elements = torch.sum(self._x.unsqueeze(1) < q_1d.unsqueeze(0), dim=0, dtype=torch.int)
# The index pointing to the first element in x such that x[lower_bound_i] < q_i
# If a point is smaller that all x elements, it will assign 0
lower_bound = torch.clamp(num_smaller_elements - 1, min=0)
# The index pointing to the first element in x such that x[upper_bound_i] >= q_i
# If a point is greater than all x elements, it will assign the last elements' index
upper_bound = torch.clamp(num_smaller_elements, max=self._x.numel() - 1)
# compute the weight as: (q_i - x_lb) / (x_ub - x_lb)
weight = (q_1d - self._x[lower_bound]) / (self._x[upper_bound] - self._x[lower_bound])
# If a point is out of bounds assign weight 0.0
weight[upper_bound == lower_bound] = 0.0
# Perform linear interpolation
fq = self._y[lower_bound] + weight * (self._y[upper_bound] - self._y[lower_bound])
# deserialized fq
fq = fq.view(q.shape)
return fq
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import torch
import unittest
"""Launch Isaac Sim Simulator first."""
from omni.isaac.lab.app import AppLauncher, run_tests
# launch omniverse app in headless mode
simulation_app = AppLauncher(headless=True).app
"""Rest everything follows from here."""
from omni.isaac.lab.utils import CircularBuffer
class TestCircularBuffer(unittest.TestCase):
"""Test fixture for checking the circular buffer implementation."""
def setUp(self):
self.max_len = 5
self.batch_size = 3
self.device = "cpu"
self.buffer = CircularBuffer(self.max_len, self.batch_size, self.device)
"""
Test cases for CircularBuffer class.
"""
def test_initialization(self):
"""Test initialization of the circular buffer."""
self.assertEqual(self.buffer.max_length, self.max_len)
self.assertEqual(self.buffer.batch_size, self.batch_size)
self.assertEqual(self.buffer.device, self.device)
self.assertEqual(self.buffer.current_length.tolist(), [0, 0, 0])
def test_reset(self):
"""Test resetting the circular buffer."""
# append some data
data = torch.ones((self.batch_size, 2), device=self.device)
self.buffer.append(data)
# reset the buffer
self.buffer.reset()
# check if the buffer is empty
self.assertEqual(self.buffer.current_length.tolist(), [0, 0, 0])
def test_append_and_retrieve(self):
"""Test appending and retrieving data from the circular buffer."""
# append some data
data1 = torch.tensor([[1, 1], [1, 1], [1, 1]], device=self.device)
data2 = torch.tensor([[2, 2], [2, 2], [2, 2]], device=self.device)
self.buffer.append(data1)
self.buffer.append(data2)
self.assertEqual(self.buffer.current_length.tolist(), [2, 2, 2])
retrieved_data = self.buffer[torch.tensor([0, 0, 0], device=self.device)]
self.assertTrue(torch.equal(retrieved_data, data2))
retrieved_data = self.buffer[torch.tensor([1, 1, 1], device=self.device)]
self.assertTrue(torch.equal(retrieved_data, data1))
def test_buffer_overflow(self):
"""Test buffer overflow.
If the buffer is full, the oldest data should be overwritten.
"""
# add data in ascending order
for count in range(self.max_len + 2):
data = torch.full((self.batch_size, 4), count, device=self.device)
self.buffer.append(data)
# check buffer length is correct
self.assertEqual(self.buffer.current_length.tolist(), [self.max_len, self.max_len, self.max_len])
# retrieve most recent data
key = torch.tensor([0, 0, 0], device=self.device)
retrieved_data = self.buffer[key]
expected_data = torch.full_like(data, self.max_len + 1)
self.assertTrue(torch.equal(retrieved_data, expected_data))
# retrieve the oldest data
key = torch.tensor([self.max_len - 1, self.max_len - 1, self.max_len - 1], device=self.device)
retrieved_data = self.buffer[key]
expected_data = torch.full_like(data, 2)
self.assertTrue(torch.equal(retrieved_data, expected_data))
def test_empty_buffer_access(self):
"""Test accessing an empty buffer."""
with self.assertRaises(RuntimeError):
self.buffer[torch.tensor([0, 0, 0], device=self.device)]
def test_invalid_batch_size(self):
"""Test appending data with an invalid batch size."""
data = torch.ones((self.batch_size + 1, 2), device=self.device)
with self.assertRaises(ValueError):
self.buffer.append(data)
with self.assertRaises(ValueError):
self.buffer[torch.tensor([0, 0], device=self.device)]
def test_key_greater_than_pushes(self):
"""Test retrieving data with a key greater than the number of pushes.
In this case, the oldest data should be returned.
"""
data1 = torch.tensor([[1, 1], [1, 1], [1, 1]], device=self.device)
data2 = torch.tensor([[2, 2], [2, 2], [2, 2]], device=self.device)
self.buffer.append(data1)
self.buffer.append(data2)
retrieved_data = self.buffer[torch.tensor([5, 5, 5], device=self.device)]
self.assertTrue(torch.equal(retrieved_data, data1))
if __name__ == "__main__":
run_tests()
......@@ -3,9 +3,6 @@
#
# SPDX-License-Identifier: BSD-3-Clause
import torch
import unittest
"""Launch Isaac Sim Simulator first."""
from omni.isaac.lab.app import AppLauncher, run_tests
......@@ -13,52 +10,54 @@ from omni.isaac.lab.app import AppLauncher, run_tests
# launch omniverse app in headless mode
simulation_app = AppLauncher(headless=True).app
"""Rest everything follows from here."""
import torch
import unittest
from collections.abc import Generator
from omni.isaac.lab.utils import DelayBuffer
class TestDelayBuffer(unittest.TestCase):
"""Test fixture for checking Delay Buffer utilities in Orbit."""
"""Test fixture for checking the delay buffer implementation."""
device: str = "cpu"
num_envs: int = 10
max_num_histories: int = 4
def generate_data(self, length: int) -> torch.Tensor:
for step in range(length):
yield torch.ones((self.num_envs, 1), dtype=int, device=self.device) * step
def setUp(self):
self.device: str = "cpu"
self.batch_size: int = 10
self.history_length: int = 4
# create the buffer
self.buffer = DelayBuffer(self.history_length, batch_size=self.batch_size, device=self.device)
def test_constant_time_lags(self):
"""Test constant delay."""
const_lag: int = 3
delay_buffer = DelayBuffer(self.max_num_histories, num_envs=self.num_envs, device=self.device)
delay_buffer.set_time_lag(const_lag)
self.buffer.set_time_lag(const_lag)
all_data = []
for i, data in enumerate(self.generate_data(20)):
for i, data in enumerate(self._generate_data(20)):
all_data.append(data)
# apply delay
delayed_data = delay_buffer.compute(data)
delayed_data = self.buffer.compute(data)
error = delayed_data - all_data[max(0, i - const_lag)]
self.assertTrue(torch.all(error == 0))
def test_reset(self):
"""Test resetting the last two environments after iteration `reset_itr`."""
"""Test resetting the last two batch indices after iteration `reset_itr`."""
const_lag: int = 2
reset_itr = 10
delay_buffer = DelayBuffer(self.max_num_histories, num_envs=self.num_envs, device=self.device)
delay_buffer.set_time_lag(const_lag)
self.buffer.set_time_lag(const_lag)
all_data = []
for i, data in enumerate(self.generate_data(20)):
for i, data in enumerate(self._generate_data(20)):
all_data.append(data)
# from 'reset_itr' iteration reset the last and second-to-last environments
if i == reset_itr:
delay_buffer.reset([-2, -1])
self.buffer.reset([-2, -1])
# apply delay
delayed_data = delay_buffer.compute(data)
delayed_data = self.buffer.compute(data)
# before 'reset_itr' is is similar to test_constant_time_lags
# after that indices [-2, -1] should be treated separately
if i < reset_itr:
......@@ -71,24 +70,30 @@ class TestDelayBuffer(unittest.TestCase):
self.assertTrue(torch.all(error2_reset == 0))
def test_random_time_lags(self):
"""Test random delay."""
"""Test random delays."""
max_lag: int = 3
time_lags = torch.randint(low=0, high=max_lag + 1, size=(self.num_envs,), dtype=torch.int, device=self.device)
time_lags = torch.randint(low=0, high=max_lag + 1, size=(self.batch_size,), dtype=torch.int, device=self.device)
delay_buffer = DelayBuffer(self.max_num_histories, num_envs=self.num_envs, device=self.device)
delay_buffer.set_time_lag(time_lags)
self.buffer.set_time_lag(time_lags)
all_data = []
for i, data in enumerate(self.generate_data(20)):
for i, data in enumerate(self._generate_data(20)):
all_data.append(data)
# apply delay
delayed_data = delay_buffer.compute(data)
true_delayed_index = torch.maximum(i - delay_buffer.time_lags, torch.zeros_like(delay_buffer.time_lags))
delayed_data = self.buffer.compute(data)
true_delayed_index = torch.maximum(i - self.buffer.time_lags, torch.zeros_like(self.buffer.time_lags))
true_delayed_index = true_delayed_index.tolist()
for i in range(self.num_envs):
for i in range(self.batch_size):
error = delayed_data[i] - all_data[true_delayed_index[i]][i]
self.assertTrue(torch.all(error == 0))
"""Helper functions."""
def _generate_data(self, length: int) -> Generator[torch.Tensor]:
"""Data generator for testing the buffer."""
for step in range(length):
yield torch.full((self.batch_size, 1), step, dtype=torch.int, device=self.device)
if __name__ == "__main__":
run_tests()
......@@ -4,9 +4,12 @@
# SPDX-License-Identifier: BSD-3-Clause
"""Configuration for the Boston Dynamics Spot robot."""
"""Configuration for the Boston Dynamics robot.
The following configuration parameters are available:
from __future__ import annotations
* :obj:`SPOT_CFG`: The Spot robot with delay PD and remote PD actuators.
"""
import torch
......@@ -119,8 +122,10 @@ joint_parameter_lookup = torch.tensor([
[-0.272558, -22.528547, 33.792821],
[-0.247100, -20.401667, 30.602500],
])
"""Describes relationship between the joint angle (rad), the transmission ratio (in/out), and the output torque (N*m)
for the knees of the Boston Dynamics Spot robot.
"""The lookup table for the knee joint parameters of the Boston Dynamics Spot robot.
This table describes the relationship between the joint angle (rad), the transmission ratio (in/out),
and the output torque (N*m). It is used to interpolate the output torque based on the joint angle.
"""
##
......@@ -162,17 +167,17 @@ SPOT_CFG = ArticulationCfg(
effort_limit=45.0,
stiffness=60.0,
damping=1.5,
min_num_time_lags=0, # physics time steps (min: 2.0*0=0.0ms)
max_num_time_lags=4, # physics time steps (max: 2.0*4=8.0ms)
min_delay=0, # physics time steps (min: 2.0*0=0.0ms)
max_delay=4, # physics time steps (max: 2.0*4=8.0ms)
),
"spot_knee": RemotizedPDActuatorCfg(
joint_names_expr=[".*_kn"],
joint_parameter_lookup=joint_parameter_lookup,
effort_limit=None, # torque limits are handled based experimental data (:meth:`RemotizedPDActuatorCfg.data`)
effort_limit=None, # torque limits are handled based experimental data (`RemotizedPDActuatorCfg.data`)
stiffness=60.0,
damping=1.5,
min_num_time_lags=0, # physics time steps (min: 2.0*0=0.0ms)
max_num_time_lags=4, # physics time steps (max: 2.0*4=8.0ms)
min_delay=0, # physics time steps (min: 2.0*0=0.0ms)
max_delay=4, # physics time steps (max: 2.0*4=8.0ms)
),
},
)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment