Unverified Commit 84b2d2d8 authored by David Hoeller's avatar David Hoeller Committed by GitHub

Adds a rigid body collection class (#1288)

# Description

Adds a rigid body collection class, which allows to spawn multiple
objects in each environment and access/modify the quantities with a
unified (env_ids, object_ids) API.

## Type of change

- New feature (non-breaking change which adds functionality)

## Checklist

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./isaaclab.sh --format`
- [x] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [x] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there
parent b9a49cae
......@@ -12,6 +12,9 @@
RigidObject
RigidObjectData
RigidObjectCfg
RigidObjectCollection
RigidObjectCollectionData
RigidObjectCollectionCfg
Articulation
ArticulationData
ArticulationCfg
......@@ -51,6 +54,26 @@ Rigid Object
:show-inheritance:
:exclude-members: __init__, class_type
Rigid Object Collection
-----------------------
.. autoclass:: RigidObjectCollection
:members:
:inherited-members:
:show-inheritance:
.. autoclass:: RigidObjectCollectionData
:members:
:inherited-members:
:show-inheritance:
:exclude-members: __init__
.. autoclass:: RigidObjectCollectionCfg
:members:
:inherited-members:
:show-inheritance:
:exclude-members: __init__, class_type
Articulation
------------
......
Spawning Multiple Assets
========================
.. currentmodule:: omni.isaac.lab
Typical, spawning configurations (introduced in the :ref:`tutorial-spawn-prims` tutorial) copy the same
Typical spawning configurations (introduced in the :ref:`tutorial-spawn-prims` tutorial) copy the same
asset (or USD primitive) across the different resolved prim paths from the expressions.
For instance, if the user specifies to spawn the asset at "/World/Table\_.*/Object", the same
asset is created at the paths "/World/Table_0/Object", "/World/Table_1/Object" and so on.
However, at times, it might be desirable to spawn different assets under the prim paths to
ensure a diversity in the simulation. This guide describes how to create different assets under
each prim path using the spawning functionality.
However, we also support multi-asset spawning with two mechanisms:
1. Rigid object collections. This allows the user to spawn multiple rigid objects in each environment and access/modify
them with a unified API, improving performance.
2. Spawning different assets under the same prim path. This allows the user to create diverse simulations, where each
environment has a different asset.
This guide describes how to use these two mechanisms.
The sample script ``multi_asset.py`` is used as a reference, located in the
``IsaacLab/source/standalone/demos`` directory.
......@@ -20,20 +27,41 @@ The sample script ``multi_asset.py`` is used as a reference, located in the
.. literalinclude:: ../../../source/standalone/demos/multi_asset.py
:language: python
:emphasize-lines: 101-123, 130-149
:emphasize-lines: 109-131, 135-179, 184-203
:linenos:
This script creates multiple environments, where each environment has a rigid object that is either a cone,
a cube, or a sphere, and an articulation that is either the ANYmal-C or ANYmal-D robot.
This script creates multiple environments, where each environment has:
* a rigid object collection containing a cone, a cube, and a sphere
* a rigid object that is either a cone, a cube, or a sphere, chosen at random
* an articulation that is either the ANYmal-C or ANYmal-D robot, chosen at random
.. image:: ../_static/demos/multi_asset.jpg
:width: 100%
:alt: result of multi_asset.py
Using Multi-Asset Spawning Functions
------------------------------------
It is possible to spawn different assets and USDs in each environment using the spawners
Rigid Object Collections
------------------------
Multiple rigid objects can be spawned in each environment and accessed/modified with a unified ``(env_ids, obj_ids)`` API.
While the user could also create multiple rigid objects by spawning them individually, the API is more user-friendly and
more efficient since it uses a single physics view under the hood to handle all the objects.
.. literalinclude:: ../../../source/standalone/demos/multi_asset.py
:language: python
:lines: 135-179
:dedent:
The configuration :class:`~assets.RigidObjectCollectionCfg` is used to create the collection. It's attribute :attr:`~assets.RigidObjectCollectionCfg.rigid_objects`
is a dictionary containing :class:`~assets.RigidObjectCfg` objects. The keys serve as unique identifiers for each
rigid object in the collection.
Spawning different assets under the same prim path
--------------------------------------------------
It is possible to spawn different assets and USDs under the same prim path in each environment using the spawners
:class:`~sim.spawners.wrappers.MultiAssetSpawnerCfg` and :class:`~sim.spawners.wrappers.MultiUsdFileCfg`:
* We set the spawn configuration in :class:`~assets.RigidObjectCfg` to be
......@@ -41,7 +69,7 @@ It is possible to spawn different assets and USDs in each environment using the
.. literalinclude:: ../../../source/standalone/demos/multi_asset.py
:language: python
:lines: 99-125
:lines: 107-133
:dedent:
This function allows you to define a list of different assets that can be spawned as rigid objects.
......@@ -53,14 +81,14 @@ It is possible to spawn different assets and USDs in each environment using the
.. literalinclude:: ../../../source/standalone/demos/multi_asset.py
:language: python
:lines: 128-161
:lines: 182-215
:dedent:
Similar to before, this configuration allows the selection of different USD files representing articulated assets.
Things to Note
--------------
~~~~~~~~~~~~~~
Similar asset structuring
~~~~~~~~~~~~~~~~~~~~~~~~~
......@@ -85,7 +113,7 @@ anymore. Hence the flag :attr:`scene.InteractiveScene.replicate_physics` must be
.. literalinclude:: ../../../source/standalone/demos/multi_asset.py
:language: python
:lines: 221-224
:lines: 280-283
:dedent:
The Code Execution
......
[package]
# Note: Semantic Versioning is used: https://semver.org/
version = "0.27.13"
version = "0.27.14"
# Description
title = "Isaac Lab framework for Robot Learning"
......
Changelog
---------
0.27.14 (2024-10-23)
~~~~~~~~~~~~~~~~~~~~
Added
^^^^^
* Added the class :class:`~omni.isaac.lab.assets.RigidObjectCollection` which allows to spawn
multiple objects in each environment and access/modify the quantities with a unified (env_ids, object_ids) API.
0.27.13 (2024-10-30)
~~~~~~~~~~~~~~~~~~~~
......@@ -13,7 +23,7 @@ Added
0.27.12 (2024-01-04)
~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~
Removed
^^^^^^^
......
......@@ -43,3 +43,4 @@ from .asset_base import AssetBase
from .asset_base_cfg import AssetBaseCfg
from .deformable_object import DeformableObject, DeformableObjectCfg, DeformableObjectData
from .rigid_object import RigidObject, RigidObjectCfg, RigidObjectData
from .rigid_object_collection import RigidObjectCollection, RigidObjectCollectionCfg, RigidObjectCollectionData
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Sub-module for rigid object collection."""
from .rigid_object_collection import RigidObjectCollection
from .rigid_object_collection_cfg import RigidObjectCollectionCfg
from .rigid_object_collection_data import RigidObjectCollectionData
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import re
import torch
import weakref
from collections.abc import Sequence
from typing import TYPE_CHECKING
import omni.kit.app
import omni.log
import omni.physics.tensors.impl.api as physx
import omni.timeline
from pxr import UsdPhysics
import omni.isaac.lab.sim as sim_utils
import omni.isaac.lab.utils.math as math_utils
import omni.isaac.lab.utils.string as string_utils
from ..asset_base import AssetBase
from .rigid_object_collection_data import RigidObjectCollectionData
if TYPE_CHECKING:
from .rigid_object_collection_cfg import RigidObjectCollectionCfg
class RigidObjectCollection(AssetBase):
"""A rigid object collection class.
This class represents a collection of rigid objects in the simulation, where the state of the rigid objects can be
accessed and modified using a batched ``(env_ids, object_ids)`` API.
For each rigid body in the collection, the root prim of the asset must have the `USD RigidBodyAPI`_
applied to it. This API is used to define the simulation properties of the rigid bodies. On playing the
simulation, the physics engine will automatically register the rigid bodies and create a corresponding
rigid body handle. This handle can be accessed using the :attr:`root_physx_view` attribute.
.. note::
Rigid objects in the collection are uniquely identified via the key of the dictionary
:attr:`~omni.isaac.lab.assets.RigidObjectCollectionCfg.rigid_objects` in :class:`~omni.isaac.lab.assets.RigidObjectCollectionCfg`.
This differs from the class :class:`~omni.isaac.lab.assets.RigidObject`, where a rigid object is identified by
the name of the Xform where the `USD RigidBodyAPI`_ is applied. This would not be possible for the rigid object
collection since the :attr:`~omni.isaac.lab.assets.RigidObjectCollectionCfg.rigid_objects` dictionary could
contain the same rigid object multiple times, leading to ambiguity.
.. _`USD RigidBodyAPI`: https://openusd.org/dev/api/class_usd_physics_rigid_body_a_p_i.html
"""
cfg: RigidObjectCollectionCfg
"""Configuration instance for the rigid object collection."""
def __init__(self, cfg: RigidObjectCollectionCfg):
"""Initialize the rigid object collection.
Args:
cfg: A configuration instance.
"""
# check that the config is valid
cfg.validate()
# store inputs
self.cfg = cfg
# flag for whether the asset is initialized
self._is_initialized = False
for rigid_object_cfg in self.cfg.rigid_objects.values():
# check if the rigid object path is valid
# note: currently the spawner does not work if there is a regex pattern in the leaf
# For example, if the prim path is "/World/Object_[1,2]" since the spawner will not
# know which prim to spawn. This is a limitation of the spawner and not the asset.
asset_path = rigid_object_cfg.prim_path.split("/")[-1]
asset_path_is_regex = re.match(r"^[a-zA-Z0-9/_]+$", asset_path) is None
# spawn the asset
if rigid_object_cfg.spawn is not None and not asset_path_is_regex:
rigid_object_cfg.spawn.func(
rigid_object_cfg.prim_path,
rigid_object_cfg.spawn,
translation=rigid_object_cfg.init_state.pos,
orientation=rigid_object_cfg.init_state.rot,
)
# check that spawn was successful
matching_prims = sim_utils.find_matching_prims(rigid_object_cfg.prim_path)
if len(matching_prims) == 0:
raise RuntimeError(f"Could not find prim with path {rigid_object_cfg.prim_path}.")
# stores object names
self._object_names_list = []
# note: Use weakref on all callbacks to ensure that this object can be deleted when its destructor is called.
# add callbacks for stage play/stop
# The order is set to 10 which is arbitrary but should be lower priority than the default order of 0
timeline_event_stream = omni.timeline.get_timeline_interface().get_timeline_event_stream()
self._initialize_handle = timeline_event_stream.create_subscription_to_pop_by_type(
int(omni.timeline.TimelineEventType.PLAY),
lambda event, obj=weakref.proxy(self): obj._initialize_callback(event),
order=10,
)
self._invalidate_initialize_handle = timeline_event_stream.create_subscription_to_pop_by_type(
int(omni.timeline.TimelineEventType.STOP),
lambda event, obj=weakref.proxy(self): obj._invalidate_initialize_callback(event),
order=10,
)
self._debug_vis_handle = None
"""
Properties
"""
@property
def data(self) -> RigidObjectCollectionData:
return self._data
@property
def num_instances(self) -> int:
"""Number of instances of the collection."""
return self.root_physx_view.count // self.num_objects
@property
def num_objects(self) -> int:
"""Number of objects in the collection.
This corresponds to the distinct number of rigid bodies in the collection.
"""
return len(self.object_names)
@property
def object_names(self) -> list[str]:
"""Ordered names of objects in the rigid object collection."""
return self._object_names_list
@property
def root_physx_view(self) -> physx.RigidBodyView:
"""Rigid body view for the rigid body collection (PhysX).
Note:
Use this view with caution. It requires handling of tensors in a specific way.
"""
return self._root_physx_view
"""
Operations.
"""
def reset(self, env_ids: torch.Tensor | None = None, object_ids: slice | torch.Tensor | None = None):
"""Resets all internal buffers of selected environments and objects.
Args:
env_ids: The indices of the object to reset. Defaults to None (all instances).
object_ids: The indices of the object to reset. Defaults to None (all objects).
"""
# resolve all indices
if env_ids is None:
env_ids = self._ALL_ENV_INDICES
if object_ids is None:
object_ids = self._ALL_OBJ_INDICES
# reset external wrench
self._external_force_b[env_ids[:, None], object_ids] = 0.0
self._external_torque_b[env_ids[:, None], object_ids] = 0.0
def write_data_to_sim(self):
"""Write external wrench to the simulation.
Note:
We write external wrench to the simulation here since this function is called before the simulation step.
This ensures that the external wrench is applied at every simulation step.
"""
# write external wrench
if self.has_external_wrench:
self.root_physx_view.apply_forces_and_torques_at_position(
force_data=self.reshape_data_to_view(self._external_force_b),
torque_data=self.reshape_data_to_view(self._external_torque_b),
position_data=None,
indices=self._env_obj_ids_to_view_ids(self._ALL_ENV_INDICES, self._ALL_OBJ_INDICES),
is_global=False,
)
def update(self, dt: float):
self._data.update(dt)
"""
Operations - Finders.
"""
def find_objects(
self, name_keys: str | Sequence[str], preserve_order: bool = False
) -> tuple[torch.Tensor, list[str]]:
"""Find objects in the collection based on the name keys.
Please check the :meth:`omni.isaac.lab.utils.string_utils.resolve_matching_names` function for more
information on the name matching.
Args:
name_keys: A regular expression or a list of regular expressions to match the object names.
preserve_order: Whether to preserve the order of the name keys in the output. Defaults to False.
Returns:
A tuple containing the object indices and names.
"""
obj_ids, obj_names = string_utils.resolve_matching_names(name_keys, self.object_names, preserve_order)
return torch.tensor(obj_ids, device=self.device), obj_names
"""
Operations - Write to simulation.
"""
def write_object_state_to_sim(
self,
object_state: torch.Tensor,
env_ids: torch.Tensor | None = None,
object_ids: slice | torch.Tensor | None = None,
):
"""Set the object state over selected environment and object indices into the simulation.
The object state comprises of the cartesian position, quaternion orientation in (w, x, y, z), and linear
and angular velocity. All the quantities are in the simulation frame.
Args:
object_state: Object state in simulation frame. Shape is (len(env_ids), len(object_ids), 13).
env_ids: Environment indices. If None, then all indices are used.
object_ids: Object indices. If None, then all indices are used.
"""
# set into simulation
self.write_object_pose_to_sim(object_state[..., :7], env_ids=env_ids, object_ids=object_ids)
self.write_object_velocity_to_sim(object_state[..., 7:], env_ids=env_ids, object_ids=object_ids)
def write_object_pose_to_sim(
self,
object_pose: torch.Tensor,
env_ids: torch.Tensor | None = None,
object_ids: slice | torch.Tensor | None = None,
):
"""Set the object pose over selected environment and object indices into the simulation.
The object pose comprises of the cartesian position and quaternion orientation in (w, x, y, z).
Args:
object_pose: Object poses in simulation frame. Shape is (len(env_ids), len(object_ids), 7).
env_ids: Environment indices. If None, then all indices are used.
object_ids: Object indices. If None, then all indices are used.
"""
# resolve all indices
# -- env_ids
if env_ids is None:
env_ids = self._ALL_ENV_INDICES
# -- object_ids
if object_ids is None:
object_ids = self._ALL_OBJ_INDICES
# note: we need to do this here since tensors are not set into simulation until step.
# set into internal buffers
self._data.object_state_w[env_ids[:, None], object_ids, :7] = object_pose.clone()
# convert the quaternion from wxyz to xyzw
poses_xyzw = self._data.object_state_w[..., :7].clone()
poses_xyzw[..., 3:] = math_utils.convert_quat(poses_xyzw[..., 3:], to="xyzw")
# set into simulation
view_ids = self._env_obj_ids_to_view_ids(env_ids, object_ids)
self.root_physx_view.set_transforms(self.reshape_data_to_view(poses_xyzw), indices=view_ids)
def write_object_velocity_to_sim(
self,
object_velocity: torch.Tensor,
env_ids: torch.Tensor | None = None,
object_ids: slice | torch.Tensor | None = None,
):
"""Set the object velocity over selected environment and object indices into the simulation.
Args:
object_velocity: Object velocities in simulation frame. Shape is (len(env_ids), len(object_ids), 6).
env_ids: Environment indices. If None, then all indices are used.
object_ids: Object indices. If None, then all indices are used.
"""
# resolve all indices
# -- env_ids
if env_ids is None:
env_ids = self._ALL_ENV_INDICES
# -- object_ids
if object_ids is None:
object_ids = self._ALL_OBJ_INDICES
self._data.object_state_w[env_ids[:, None], object_ids, 7:] = object_velocity.clone()
self._data.object_acc_w[env_ids[:, None], object_ids] = 0.0
# set into simulation
view_ids = self._env_obj_ids_to_view_ids(env_ids, object_ids)
self.root_physx_view.set_velocities(
self.reshape_data_to_view(self._data.object_state_w[..., 7:]), indices=view_ids
)
"""
Operations - Setters.
"""
def set_external_force_and_torque(
self,
forces: torch.Tensor,
torques: torch.Tensor,
object_ids: slice | torch.Tensor | None = None,
env_ids: torch.Tensor | None = None,
):
"""Set external force and torque to apply on the objects' bodies in their local frame.
For many applications, we want to keep the applied external force on rigid bodies constant over a period of
time (for instance, during the policy control). This function allows us to store the external force and torque
into buffers which are then applied to the simulation at every step.
.. caution::
If the function is called with empty forces and torques, then this function disables the application
of external wrench to the simulation.
.. code-block:: python
# example of disabling external wrench
asset.set_external_force_and_torque(forces=torch.zeros(0, 0, 3), torques=torch.zeros(0, 0, 3))
.. note::
This function does not apply the external wrench to the simulation. It only fills the buffers with
the desired values. To apply the external wrench, call the :meth:`write_data_to_sim` function
right before the simulation step.
Args:
forces: External forces in bodies' local frame. Shape is (len(env_ids), len(object_ids), 3).
torques: External torques in bodies' local frame. Shape is (len(env_ids), len(object_ids), 3).
object_ids: Object indices to apply external wrench to. Defaults to None (all objects).
env_ids: Environment indices to apply external wrench to. Defaults to None (all instances).
"""
if forces.any() or torques.any():
self.has_external_wrench = True
# resolve all indices
# -- env_ids
if env_ids is None:
env_ids = self._ALL_ENV_INDICES
# -- object_ids
if object_ids is None:
object_ids = self._ALL_OBJ_INDICES
# set into internal buffers
self._external_force_b[env_ids[:, None], object_ids] = forces
self._external_torque_b[env_ids[:, None], object_ids] = torques
else:
self.has_external_wrench = False
"""
Internal helper.
"""
def _initialize_impl(self):
# create simulation view
self._physics_sim_view = physx.create_simulation_view(self._backend)
self._physics_sim_view.set_subspace_roots("/")
root_prim_path_exprs = []
for name, rigid_object_cfg in self.cfg.rigid_objects.items():
# obtain the first prim in the regex expression (all others are assumed to be a copy of this)
template_prim = sim_utils.find_first_matching_prim(rigid_object_cfg.prim_path)
if template_prim is None:
raise RuntimeError(f"Failed to find prim for expression: '{rigid_object_cfg.prim_path}'.")
template_prim_path = template_prim.GetPath().pathString
# find rigid root prims
root_prims = sim_utils.get_all_matching_child_prims(
template_prim_path, predicate=lambda prim: prim.HasAPI(UsdPhysics.RigidBodyAPI)
)
if len(root_prims) == 0:
raise RuntimeError(
f"Failed to find a rigid body when resolving '{rigid_object_cfg.prim_path}'."
" Please ensure that the prim has 'USD RigidBodyAPI' applied."
)
if len(root_prims) > 1:
raise RuntimeError(
f"Failed to find a single rigid body when resolving '{rigid_object_cfg.prim_path}'."
f" Found multiple '{root_prims}' under '{template_prim_path}'."
" Please ensure that there is only one rigid body in the prim path tree."
)
# check that no rigid object has an articulation root API, which decreases simulation performance
articulation_prims = sim_utils.get_all_matching_child_prims(
template_prim_path, predicate=lambda prim: prim.HasAPI(UsdPhysics.ArticulationRootAPI)
)
if len(articulation_prims) != 0:
if articulation_prims[0].GetAttribute("physxArticulation:articulationEnabled").Get():
raise RuntimeError(
f"Found an articulation root when resolving '{rigid_object_cfg.prim_path}' in the rigid object"
f" collection. These are located at: '{articulation_prims}' under '{template_prim_path}'."
" Please disable the articulation root in the USD or from code by setting the parameter"
" 'ArticulationRootPropertiesCfg.articulation_enabled' to False in the spawn configuration."
)
# resolve root prim back into regex expression
root_prim_path = root_prims[0].GetPath().pathString
root_prim_path_expr = rigid_object_cfg.prim_path + root_prim_path[len(template_prim_path) :]
root_prim_path_exprs.append(root_prim_path_expr.replace(".*", "*"))
self._object_names_list.append(name)
# -- object view
self._root_physx_view = self._physics_sim_view.create_rigid_body_view(root_prim_path_exprs)
# check if the rigid body was created
if self._root_physx_view._backend is None:
raise RuntimeError("Failed to create rigid body collection. Please check PhysX logs.")
# log information about the rigid body
omni.log.info(f"Number of instances: {self.num_instances}")
omni.log.info(f"Number of distinct objects: {self.num_objects}")
omni.log.info(f"Object names: {self.object_names}")
# container for data access
self._data = RigidObjectCollectionData(self.root_physx_view, self.num_objects, self.device)
# create buffers
self._create_buffers()
# process configuration
self._process_cfg()
# update the rigid body data
self.update(0.0)
def _create_buffers(self):
"""Create buffers for storing data."""
# constants
self._ALL_ENV_INDICES = torch.arange(self.num_instances, dtype=torch.long, device=self.device)
self._ALL_OBJ_INDICES = torch.arange(self.num_objects, dtype=torch.long, device=self.device)
# external forces and torques
self.has_external_wrench = False
self._external_force_b = torch.zeros((self.num_instances, self.num_objects, 3), device=self.device)
self._external_torque_b = torch.zeros_like(self._external_force_b)
# set information about rigid body into data
self._data.object_names = self.object_names
self._data.default_mass = self.reshape_view_to_data(self.root_physx_view.get_masses().clone())
self._data.default_inertia = self.reshape_view_to_data(self.root_physx_view.get_inertias().clone())
def _process_cfg(self):
"""Post processing of configuration parameters."""
# default state
# -- object state
default_object_states = []
for rigid_object_cfg in self.cfg.rigid_objects.values():
default_object_state = (
tuple(rigid_object_cfg.init_state.pos)
+ tuple(rigid_object_cfg.init_state.rot)
+ tuple(rigid_object_cfg.init_state.lin_vel)
+ tuple(rigid_object_cfg.init_state.ang_vel)
)
default_object_state = (
torch.tensor(default_object_state, dtype=torch.float, device=self.device)
.repeat(self.num_instances, 1)
.unsqueeze(1)
)
default_object_states.append(default_object_state)
# concatenate the default state for each object
default_object_states = torch.cat(default_object_states, dim=1)
self._data.default_object_state = default_object_states
def reshape_view_to_data(self, data: torch.Tensor) -> torch.Tensor:
"""Reshapes and arranges the data coming from the :attr:`root_physx_view` to (num_instances, num_objects, data_size).
Args:
data: The data coming from the :attr:`root_physx_view`. Shape is (num_instances*num_objects, data_size).
Returns:
The reshaped data. Shape is (num_instances, num_objects, data_size).
"""
return torch.einsum("ijk -> jik", data.reshape(self.num_objects, self.num_instances, -1))
def reshape_data_to_view(self, data: torch.Tensor) -> torch.Tensor:
"""Reshapes and arranges the data to the be consistent with data from the :attr:`root_physx_view`.
Args:
data: The data to be reshaped. Shape is (num_instances, num_objects, data_size).
Returns:
The reshaped data. Shape is (num_instances*num_objects, data_size).
"""
return torch.einsum("ijk -> jik", data).reshape(self.num_objects * self.num_instances, *data.shape[2:])
def _env_obj_ids_to_view_ids(
self, env_ids: torch.Tensor, object_ids: Sequence[int] | slice | torch.Tensor
) -> torch.Tensor:
"""Converts environment and object indices to indices consistent with data from :attr:`root_physx_view`.
Args:
env_ids: Environment indices.
object_ids: Object indices.
Returns:
The view indices.
"""
# the order is env_0/object_0, env_0/object_1, env_0/object_..., env_1/object_0, env_1/object_1, ...
# return a flat tensor of indices
if isinstance(object_ids, slice):
object_ids = self._ALL_OBJ_INDICES
elif isinstance(object_ids, Sequence):
object_ids = torch.tensor(object_ids, device=self.device)
return (object_ids.unsqueeze(1) * self.num_instances + env_ids).flatten()
"""
Internal simulation callbacks.
"""
def _invalidate_initialize_callback(self, event):
"""Invalidates the scene elements."""
# call parent
super()._invalidate_initialize_callback(event)
# set all existing views to None to invalidate them
self._physics_sim_view = None
self._root_physx_view = None
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from dataclasses import MISSING
from omni.isaac.lab.assets.rigid_object import RigidObjectCfg
from omni.isaac.lab.utils import configclass
from .rigid_object_collection import RigidObjectCollection
@configclass
class RigidObjectCollectionCfg:
"""Configuration parameters for a rigid object collection."""
class_type: type = RigidObjectCollection
"""The associated asset class.
The class should inherit from :class:`omni.isaac.lab.assets.asset_base.AssetBase`.
"""
rigid_objects: dict[str, RigidObjectCfg] = MISSING
"""Dictionary of rigid object configurations to spawn.
The keys are the names for the objects, which are used as unique identifiers throughout the code.
"""
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import torch
import weakref
import omni.physics.tensors.impl.api as physx
import omni.isaac.lab.utils.math as math_utils
from omni.isaac.lab.utils.buffers import TimestampedBuffer
class RigidObjectCollectionData:
"""Data container for a rigid object collection.
This class contains the data for a rigid object collection in the simulation. The data includes the state of
all the bodies in the collection. The data is stored in the simulation world frame unless otherwise specified.
The data is in the order ``(num_instances, num_objects, data_size)``, where data_size is the size of the data.
For a rigid body, there are two frames of reference that are used:
- Actor frame: The frame of reference of the rigid body prim. This typically corresponds to the Xform prim
with the rigid body schema.
- Center of mass frame: The frame of reference of the center of mass of the rigid body.
Depending on the settings of the simulation, the actor frame and the center of mass frame may be the same.
This needs to be taken into account when interpreting the data.
The data is lazily updated, meaning that the data is only updated when it is accessed. This is useful
when the data is expensive to compute or retrieve. The data is updated when the timestamp of the buffer
is older than the current simulation timestamp. The timestamp is updated whenever the data is updated.
"""
def __init__(self, root_physx_view: physx.RigidBodyView, num_objects: int, device: str):
"""Initializes the data.
Args:
root_physx_view: The root rigid body view.
num_objects: The number of objects in the collection.
device: The device used for processing.
"""
# Set the parameters
self.device = device
self.num_objects = num_objects
# Set the root rigid body view
# note: this is stored as a weak reference to avoid circular references between the asset class
# and the data container. This is important to avoid memory leaks.
self._root_physx_view: physx.RigidBodyView = weakref.proxy(root_physx_view)
self.num_instances = self._root_physx_view.count // self.num_objects
# Set initial time stamp
self._sim_timestamp = 0.0
# Obtain global physics sim view
physics_sim_view = physx.create_simulation_view("torch")
physics_sim_view.set_subspace_roots("/")
gravity = physics_sim_view.get_gravity()
# Convert to direction vector
gravity_dir = torch.tensor((gravity[0], gravity[1], gravity[2]), device=self.device)
gravity_dir = math_utils.normalize(gravity_dir.unsqueeze(0)).squeeze(0)
# Initialize constants
self.GRAVITY_VEC_W = gravity_dir.repeat(self.num_instances, self.num_objects, 1)
self.FORWARD_VEC_B = torch.tensor((1.0, 0.0, 0.0), device=self.device).repeat(
self.num_instances, self.num_objects, 1
)
# Initialize the lazy buffers.
self._object_state_w = TimestampedBuffer()
self._object_acc_w = TimestampedBuffer()
def update(self, dt: float):
"""Updates the data for the rigid object collection.
Args:
dt: The time step for the update. This must be a positive value.
"""
# update the simulation timestamp
self._sim_timestamp += dt
##
# Names.
##
object_names: list[str] = None
"""Object names in the order parsed by the simulation view."""
##
# Defaults.
##
default_object_state: torch.Tensor = None
"""Default object state ``[pos, quat, lin_vel, ang_vel]`` in local environment frame. Shape is (num_instances, num_objects, 13).
The position and quaternion are of each object's rigid body's actor frame. Meanwhile, the linear and angular velocities are
of the center of mass frame.
"""
default_mass: torch.Tensor = None
"""Default object mass read from the simulation. Shape is (num_instances, num_objects, 1)."""
default_inertia: torch.Tensor = None
"""Default object inertia tensor read from the simulation. Shape is (num_instances, num_objects, 9).
The inertia is the inertia tensor relative to the center of mass frame. The values are stored in
the order :math:`[I_{xx}, I_{xy}, I_{xz}, I_{yx}, I_{yy}, I_{yz}, I_{zx}, I_{zy}, I_{zz}]`.
"""
##
# Properties.
##
@property
def object_state_w(self):
"""Object state ``[pos, quat, lin_vel, ang_vel]`` in simulation world frame. Shape is (num_instances, num_objects, 13).
The position and orientation are of the rigid body's actor frame. Meanwhile, the linear and angular
velocities are of the rigid body's center of mass frame.
"""
if self._object_state_w.timestamp < self._sim_timestamp:
# read data from simulation
pose = self._reshape_view_to_data(self._root_physx_view.get_transforms().clone())
pose[..., 3:7] = math_utils.convert_quat(pose[..., 3:7], to="wxyz")
velocity = self._reshape_view_to_data(self._root_physx_view.get_velocities())
# set the buffer data and timestamp
self._object_state_w.data = torch.cat((pose, velocity), dim=-1)
self._object_state_w.timestamp = self._sim_timestamp
return self._object_state_w.data
@property
def object_acc_w(self):
"""Acceleration of all objects. Shape is (num_instances, num_objects, 6).
This quantity is the acceleration of the rigid bodies' center of mass frame.
"""
if self._object_acc_w.timestamp < self._sim_timestamp:
# note: we use finite differencing to compute acceleration
self._object_acc_w.data = self._reshape_view_to_data(self._root_physx_view.get_accelerations().clone())
self._object_acc_w.timestamp = self._sim_timestamp
return self._object_acc_w.data
@property
def projected_gravity_b(self):
"""Projection of the gravity direction on base frame. Shape is (num_instances, num_objects, 3)."""
return math_utils.quat_rotate_inverse(self.object_quat_w, self.GRAVITY_VEC_W)
@property
def heading_w(self):
"""Yaw heading of the base frame (in radians). Shape is (num_instances, num_objects,).
Note:
This quantity is computed by assuming that the forward-direction of the base
frame is along x-direction, i.e. :math:`(1, 0, 0)`.
"""
forward_w = math_utils.quat_apply(self.object_quat_w, self.FORWARD_VEC_B)
return torch.atan2(forward_w[..., 1], forward_w[..., 0])
##
# Derived properties.
##
@property
def object_pos_w(self) -> torch.Tensor:
"""Object position in simulation world frame. Shape is (num_instances, num_objects, 3).
This quantity is the position of the actor frame of the rigid bodies.
"""
return self.object_state_w[..., :3]
@property
def object_quat_w(self) -> torch.Tensor:
"""Object orientation (w, x, y, z) in simulation world frame. Shape is (num_instances, num_objects, 4).
This quantity is the orientation of the actor frame of the rigid bodies.
"""
return self.object_state_w[..., 3:7]
@property
def object_vel_w(self) -> torch.Tensor:
"""Object velocity in simulation world frame. Shape is (num_instances, num_objects, 6).
This quantity contains the linear and angular velocities of the rigid bodies' center of mass frame.
"""
return self.object_state_w[..., 7:13]
@property
def object_lin_vel_w(self) -> torch.Tensor:
"""Object linear velocity in simulation world frame. Shape is (num_instances, num_objects, 3).
This quantity is the linear velocity of the rigid bodies' center of mass frame.
"""
return self.object_state_w[..., 7:10]
@property
def object_ang_vel_w(self) -> torch.Tensor:
"""Object angular velocity in simulation world frame. Shape is (num_instances, num_objects, 3).
This quantity is the angular velocity of the rigid bodies' center of mass frame.
"""
return self.object_state_w[..., 10:13]
@property
def object_lin_vel_b(self) -> torch.Tensor:
"""Object linear velocity in base frame. Shape is (num_instances, num_objects, 3).
This quantity is the linear velocity of the rigid bodies' center of mass frame with respect to the
rigid body's actor frame.
"""
return math_utils.quat_rotate_inverse(self.object_quat_w, self.object_lin_vel_w)
@property
def object_ang_vel_b(self) -> torch.Tensor:
"""Object angular velocity in base world frame. Shape is (num_instances, num_objects, 3).
This quantity is the angular velocity of the rigid bodies' center of mass frame with respect to the
rigid body's actor frame.
"""
return math_utils.quat_rotate_inverse(self.object_quat_w, self.object_ang_vel_w)
@property
def object_lin_acc_w(self) -> torch.Tensor:
"""Linear acceleration of all bodies in simulation world frame. Shape is (num_instances, num_objects, 3).
This quantity is the linear acceleration of the rigid bodies' center of mass frame.
"""
return self.object_acc_w[..., 0:3]
@property
def object_ang_acc_w(self) -> torch.Tensor:
"""Angular acceleration of all bodies in simulation world frame. Shape is (num_instances, num_objects, 3).
This quantity is the angular acceleration of the rigid bodies' center of mass frame.
"""
return self.object_acc_w[..., 3:6]
##
# Helpers.
##
def _reshape_view_to_data(self, data: torch.Tensor) -> torch.Tensor:
"""Reshapes and arranges the data from the physics view to (num_instances, num_objects, data_size).
Args:
data: The data from the physics view. Shape is (num_instances*num_objects, data_size).
Returns:
The reshaped data. Shape is (num_objects, num_instances, data_size).
"""
return torch.einsum("ijk -> jik", data.reshape(self.num_objects, self.num_instances, -1))
......@@ -7,7 +7,7 @@
from dataclasses import MISSING
from omni.isaac.lab.assets import Articulation, RigidObject
from omni.isaac.lab.assets import Articulation, RigidObject, RigidObjectCollection
from omni.isaac.lab.scene import InteractiveScene
from omni.isaac.lab.utils import configclass
......@@ -78,16 +78,34 @@ class SceneEntityCfg:
manager.
"""
object_collection_names: str | list[str] | None = None
"""The names of the objects in the rigid object collection required by the term. Defaults to None.
The names can be either names or a regular expression matching the object names in the collection.
These are converted to object indices on initialization of the manager and passed to the term
function as a list of object indices under :attr:`object_collection_ids`.
"""
object_collection_ids: list[int] | slice = slice(None)
"""The indices of the objects from the rigid object collection required by the term. Defaults to slice(None),
which means all the objects in the collection.
If :attr:`object_collection_names` is specified, this is filled in automatically on initialization of the manager.
"""
preserve_order: bool = False
"""Whether to preserve indices ordering to match with that in the specified joint or body names. Defaults to False.
"""Whether to preserve indices ordering to match with that in the specified joint, body, or object collection names.
Defaults to False.
If False, the ordering of the indices are sorted in ascending order (i.e. the ordering in the entity's joints
or bodies). Otherwise, the indices are preserved in the order of the specified joint and body names.
If False, the ordering of the indices are sorted in ascending order (i.e. the ordering in the entity's joints,
bodies, or object in the object collection). Otherwise, the indices are preserved in the order of the specified
joint, body, or object collection names.
For more details, see the :meth:`omni.isaac.lab.utils.string.resolve_matching_names` function.
.. note::
This attribute is only used when :attr:`joint_names` or :attr:`body_names` are specified.
This attribute is only used when :attr:`joint_names`, :attr:`body_names`, or :attr:`object_collection_names` are specified.
"""
......@@ -106,6 +124,7 @@ class SceneEntityCfg:
ValueError: If both ``joint_names`` and ``joint_ids`` are specified and are not consistent.
ValueError: If both ``fixed_tendon_names`` and ``fixed_tendon_ids`` are specified and are not consistent.
ValueError: If both ``body_names`` and ``body_ids`` are specified and are not consistent.
ValueError: If both ``object_collection_names`` and ``object_collection_ids`` are specified and are not consistent.
"""
# check if the entity is valid
if self.name not in scene.keys():
......@@ -120,6 +139,9 @@ class SceneEntityCfg:
# convert body names to indices based on regex
self._resolve_body_names(scene)
# convert object collection names to indices based on regex
self._resolve_object_collection_names(scene)
def _resolve_joint_names(self, scene: InteractiveScene):
# convert joint names to indices based on regex
if self.joint_names is not None or self.joint_ids != slice(None):
......@@ -228,3 +250,36 @@ class SceneEntityCfg:
if isinstance(self.body_ids, int):
self.body_ids = [self.body_ids]
self.body_names = [entity.body_names[i] for i in self.body_ids]
def _resolve_object_collection_names(self, scene: InteractiveScene):
# convert object names to indices based on regex
if self.object_collection_names is not None or self.object_collection_ids != slice(None):
entity: RigidObjectCollection = scene[self.name]
# -- if both are not their default values, check if they are valid
if self.object_collection_names is not None and self.object_collection_ids != slice(None):
if isinstance(self.object_collection_names, str):
self.object_collection_names = [self.object_collection_names]
if isinstance(self.object_collection_ids, int):
self.object_collection_ids = [self.object_collection_ids]
object_ids, _ = entity.find_objects(self.object_collection_names, preserve_order=self.preserve_order)
object_names = [entity.object_names[i] for i in self.object_collection_ids]
if object_ids != self.object_collection_ids or object_names != self.object_collection_names:
raise ValueError(
"Both 'object_collection_names' and 'object_collection_ids' are specified, and are not"
" consistent.\n\tfrom object collection names:"
f" {self.object_collection_names} [{object_ids}]\n\tfrom object collection ids:"
f" {object_names} [{self.object_collection_ids}]\nHint: Use either 'object_collection_names' or"
" 'object_collection_ids' to avoid confusion."
)
# -- from object names to object indices
elif self.object_collection_names is not None:
if isinstance(self.object_collection_names, str):
self.object_collection_names = [self.object_collection_names]
self.object_collection_ids, _ = entity.find_objects(
self.object_collection_names, preserve_order=self.preserve_order
)
# -- from object indices to object names
elif self.object_collection_ids != slice(None):
if isinstance(self.object_collection_ids, int):
self.object_collection_ids = [self.object_collection_ids]
self.object_collection_names = [entity.object_names[i] for i in self.object_collection_ids]
......@@ -22,6 +22,8 @@ from omni.isaac.lab.assets import (
DeformableObjectCfg,
RigidObject,
RigidObjectCfg,
RigidObjectCollection,
RigidObjectCollectionCfg,
)
from omni.isaac.lab.sensors import ContactSensorCfg, FrameTransformerCfg, SensorBase, SensorBaseCfg
from omni.isaac.lab.terrains import TerrainImporter, TerrainImporterCfg
......@@ -113,6 +115,7 @@ class InteractiveScene:
self._articulations = dict()
self._deformable_objects = dict()
self._rigid_objects = dict()
self._rigid_object_collections = dict()
self._sensors = dict()
self._extras = dict()
# obtain the current stage
......@@ -309,6 +312,11 @@ class InteractiveScene:
"""A dictionary of rigid objects in the scene."""
return self._rigid_objects
@property
def rigid_object_collections(self) -> dict[str, RigidObjectCollection]:
"""A dictionary of rigid object collections in the scene."""
return self._rigid_object_collections
@property
def sensors(self) -> dict[str, SensorBase]:
"""A dictionary of the sensors in the scene, such as cameras and contact reporters."""
......@@ -351,6 +359,8 @@ class InteractiveScene:
deformable_object.reset(env_ids)
for rigid_object in self._rigid_objects.values():
rigid_object.reset(env_ids)
for rigid_object_collection in self._rigid_object_collections.values():
rigid_object_collection.reset(env_ids)
# -- sensors
for sensor in self._sensors.values():
sensor.reset(env_ids)
......@@ -364,6 +374,8 @@ class InteractiveScene:
deformable_object.write_data_to_sim()
for rigid_object in self._rigid_objects.values():
rigid_object.write_data_to_sim()
for rigid_object_collection in self._rigid_object_collections.values():
rigid_object_collection.write_data_to_sim()
def update(self, dt: float) -> None:
"""Update the scene entities.
......@@ -378,6 +390,8 @@ class InteractiveScene:
deformable_object.update(dt)
for rigid_object in self._rigid_objects.values():
rigid_object.update(dt)
for rigid_object_collection in self._rigid_object_collections.values():
rigid_object_collection.update(dt)
# -- sensors
for sensor in self._sensors.values():
sensor.update(dt, force_recompute=not self.cfg.lazy_sensor_update)
......@@ -397,6 +411,7 @@ class InteractiveScene:
self._articulations,
self._deformable_objects,
self._rigid_objects,
self._rigid_object_collections,
self._sensors,
self._extras,
]:
......@@ -422,6 +437,7 @@ class InteractiveScene:
self._articulations,
self._deformable_objects,
self._rigid_objects,
self._rigid_object_collections,
self._sensors,
self._extras,
]:
......@@ -454,7 +470,8 @@ class InteractiveScene:
if asset_name in InteractiveSceneCfg.__dataclass_fields__ or asset_cfg is None:
continue
# resolve regex
asset_cfg.prim_path = asset_cfg.prim_path.format(ENV_REGEX_NS=self.env_regex_ns)
if hasattr(asset_cfg, "prim_path"):
asset_cfg.prim_path = asset_cfg.prim_path.format(ENV_REGEX_NS=self.env_regex_ns)
# create asset
if isinstance(asset_cfg, TerrainImporterCfg):
# terrains are special entities since they define environment origins
......@@ -467,6 +484,14 @@ class InteractiveScene:
self._deformable_objects[asset_name] = asset_cfg.class_type(asset_cfg)
elif isinstance(asset_cfg, RigidObjectCfg):
self._rigid_objects[asset_name] = asset_cfg.class_type(asset_cfg)
elif isinstance(asset_cfg, RigidObjectCollectionCfg):
for rigid_object_cfg in asset_cfg.rigid_objects.values():
rigid_object_cfg.prim_path = rigid_object_cfg.prim_path.format(ENV_REGEX_NS=self.env_regex_ns)
self._rigid_object_collections[asset_name] = asset_cfg.class_type(asset_cfg)
for rigid_object_cfg in asset_cfg.rigid_objects.values():
if hasattr(rigid_object_cfg, "collision_group") and rigid_object_cfg.collision_group == -1:
asset_paths = sim_utils.find_matching_prim_paths(rigid_object_cfg.prim_path)
self._global_prim_paths += asset_paths
elif isinstance(asset_cfg, SensorBaseCfg):
# Update target frame path(s)' regex name space for FrameTransformer
if isinstance(asset_cfg, FrameTransformerCfg):
......
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
# ignore private usage of variables warning
# pyright: reportPrivateUsage=none
"""Launch Isaac Sim Simulator first."""
from omni.isaac.lab.app import AppLauncher, run_tests
# Can set this to False to see the GUI for debugging
# This will also add lights to the scene
HEADLESS = True
# launch omniverse app
app_launcher = AppLauncher(headless=HEADLESS)
simulation_app = app_launcher.app
"""Rest everything follows."""
import ctypes
import torch
import unittest
import omni.isaac.core.utils.prims as prim_utils
import omni.isaac.lab.sim as sim_utils
from omni.isaac.lab.assets import RigidObjectCfg, RigidObjectCollection, RigidObjectCollectionCfg
from omni.isaac.lab.sim import build_simulation_context
from omni.isaac.lab.utils.assets import ISAAC_NUCLEUS_DIR
from omni.isaac.lab.utils.math import default_orientation, random_orientation
def generate_cubes_scene(
num_envs: int = 1,
num_cubes: int = 1,
height=1.0,
has_api: bool = True,
kinematic_enabled: bool = False,
device: str = "cuda:0",
) -> tuple[RigidObjectCollection, torch.Tensor]:
"""Generate a scene with the provided number of cubes.
Args:
num_envs: Number of envs to generate.
num_cubes: Number of cubes to generate.
height: Height of the cubes.
has_api: Whether the cubes have a rigid body API on them.
kinematic_enabled: Whether the cubes are kinematic.
device: Device to use for the simulation.
Returns:
A tuple containing the rigid object representing the cubes and the origins of the cubes.
"""
origins = torch.tensor([(i * 3.0, 0, height) for i in range(num_envs)]).to(device)
# Create Top-level Xforms, one for each cube
for i, origin in enumerate(origins):
prim_utils.create_prim(f"/World/Table_{i}", "Xform", translation=origin)
# Resolve spawn configuration
if has_api:
spawn_cfg = sim_utils.UsdFileCfg(
usd_path=f"{ISAAC_NUCLEUS_DIR}/Props/Blocks/DexCube/dex_cube_instanceable.usd",
rigid_props=sim_utils.RigidBodyPropertiesCfg(kinematic_enabled=kinematic_enabled),
)
else:
# since no rigid body properties defined, this is just a static collider
spawn_cfg = sim_utils.CuboidCfg(
size=(0.1, 0.1, 0.1),
collision_props=sim_utils.CollisionPropertiesCfg(),
)
# create the rigid object configs
cube_config_dict = {}
for i in range(num_cubes):
cube_object_cfg = RigidObjectCfg(
prim_path=f"/World/Table_.*/Object_{i}",
spawn=spawn_cfg,
init_state=RigidObjectCfg.InitialStateCfg(pos=(0.0, 3 * i, height)),
)
cube_config_dict[f"cube_{i}"] = cube_object_cfg
# create the rigid object collection
cube_object_collection_cfg = RigidObjectCollectionCfg(rigid_objects=cube_config_dict)
cube_object_colection = RigidObjectCollection(cfg=cube_object_collection_cfg)
return cube_object_colection, origins
class TestRigidObjectCollection(unittest.TestCase):
"""Test for rigid object collection class."""
"""
Tests
"""
def test_initialization(self):
"""Test initialization for prim with rigid body API at the provided prim path."""
for num_envs in (1, 2):
for num_cubes in (1, 3):
for device in ("cuda:0", "cpu"):
with self.subTest(num_envs=num_envs, num_cubes=num_cubes, device=device):
with build_simulation_context(device=device, auto_add_lighting=True) as sim:
# Generate cubes scene
object_collection, _ = generate_cubes_scene(
num_envs=num_envs, num_cubes=num_cubes, device=device
)
# Check that boundedness of rigid object is correct
self.assertEqual(ctypes.c_long.from_address(id(object_collection)).value, 1)
# Play sim
sim.reset()
# Check if object is initialized
self.assertTrue(object_collection.is_initialized)
self.assertEqual(len(object_collection.object_names), num_cubes)
# Check buffers that exists and have correct shapes
self.assertEqual(object_collection.data.object_pos_w.shape, (num_envs, num_cubes, 3))
self.assertEqual(object_collection.data.object_quat_w.shape, (num_envs, num_cubes, 4))
self.assertEqual(object_collection.data.default_mass.shape, (num_envs, num_cubes, 1))
self.assertEqual(object_collection.data.default_inertia.shape, (num_envs, num_cubes, 9))
# Simulate physics
for _ in range(2):
# perform rendering
sim.step()
# update object
object_collection.update(sim.cfg.dt)
def test_id_conversion(self):
"""Test environment and object index conversion to physics view indices."""
for device in ("cuda:0", "cpu"):
with self.subTest(num_envs=2, num_cubes=3, device=device):
with build_simulation_context(device=device, auto_add_lighting=True) as sim:
# Generate cubes scene
object_collection, _ = generate_cubes_scene(num_envs=2, num_cubes=3, device=device)
# Play sim
sim.reset()
expected = [
torch.tensor([4, 5], device=device, dtype=torch.long),
torch.tensor([4], device=device, dtype=torch.long),
torch.tensor([0, 2, 4], device=device, dtype=torch.long),
torch.tensor([1, 3, 5], device=device, dtype=torch.long),
]
view_ids = object_collection._env_obj_ids_to_view_ids(
object_collection._ALL_ENV_INDICES, object_collection._ALL_OBJ_INDICES[None, 2]
)
self.assertTrue((view_ids == expected[0]).all())
view_ids = object_collection._env_obj_ids_to_view_ids(
object_collection._ALL_ENV_INDICES[None, 0], object_collection._ALL_OBJ_INDICES[None, 2]
)
self.assertTrue((view_ids == expected[1]).all())
view_ids = object_collection._env_obj_ids_to_view_ids(
object_collection._ALL_ENV_INDICES[None, 0], object_collection._ALL_OBJ_INDICES
)
self.assertTrue((view_ids == expected[2]).all())
view_ids = object_collection._env_obj_ids_to_view_ids(
object_collection._ALL_ENV_INDICES[None, 1], object_collection._ALL_OBJ_INDICES
)
self.assertTrue((view_ids == expected[3]).all())
def test_initialization_with_kinematic_enabled(self):
"""Test that initialization for prim with kinematic flag enabled."""
for num_envs in (1, 2):
for num_cubes in (1, 3):
for device in ("cuda:0", "cpu"):
with self.subTest(num_envs=num_envs, num_cubes=num_cubes, device=device):
with build_simulation_context(device=device, auto_add_lighting=True) as sim:
# Generate cubes scene
object_collection, origins = generate_cubes_scene(
num_envs=num_envs, num_cubes=num_cubes, kinematic_enabled=True, device=device
)
# Check that boundedness of rigid object is correct
self.assertEqual(ctypes.c_long.from_address(id(object_collection)).value, 1)
# Play sim
sim.reset()
# Check if object is initialized
self.assertTrue(object_collection.is_initialized)
self.assertEqual(len(object_collection.object_names), num_cubes)
# Check buffers that exists and have correct shapes
self.assertEqual(object_collection.data.object_pos_w.shape, (num_envs, num_cubes, 3))
self.assertEqual(object_collection.data.object_quat_w.shape, (num_envs, num_cubes, 4))
# Simulate physics
for _ in range(2):
# perform rendering
sim.step()
# update object
object_collection.update(sim.cfg.dt)
# check that the object is kinematic
default_object_state = object_collection.data.default_object_state.clone()
default_object_state[..., :3] += origins.unsqueeze(1)
torch.testing.assert_close(object_collection.data.object_state_w, default_object_state)
def test_initialization_with_no_rigid_body(self):
"""Test that initialization fails when no rigid body is found at the provided prim path."""
for num_cubes in (1, 2):
for device in ("cuda:0", "cpu"):
with self.subTest(num_cubes=num_cubes, device=device):
with build_simulation_context(device=device, auto_add_lighting=True) as sim:
# Generate cubes scene
object_collection, _ = generate_cubes_scene(num_cubes=num_cubes, has_api=False, device=device)
# Check that boundedness of rigid object is correct
self.assertEqual(ctypes.c_long.from_address(id(object_collection)).value, 1)
# Play sim
sim.reset()
# Check if object is initialized
self.assertFalse(object_collection.is_initialized)
def test_external_force_on_single_body(self):
"""Test application of external force on the base of the object.
In this test, we apply a force equal to the weight of an object on the base of
one of the objects. We check that the object does not move. For the other object,
we do not apply any force and check that it falls down.
"""
for num_envs in (1, 2):
for num_cubes in (1, 4):
for device in ("cuda:0", "cpu"):
with self.subTest(num_envs=num_envs, num_cubes=num_cubes, device=device):
# Generate cubes scene
with build_simulation_context(
device=device, add_ground_plane=True, auto_add_lighting=True
) as sim:
object_collection, origins = generate_cubes_scene(
num_envs=num_envs, num_cubes=num_cubes, device=device
)
# Play the simulator
sim.reset()
# Find objects to apply the force
object_ids, object_names = object_collection.find_objects(".*")
# Sample a force equal to the weight of the object
external_wrench_b = torch.zeros(
object_collection.num_instances, len(object_ids), 6, device=sim.device
)
# Every 2nd cube should have a force applied to it
external_wrench_b[:, 0::2, 2] = 9.81 * object_collection.data.default_mass[:, 0::2, 0]
# Now we are ready!
for _ in range(5):
# reset object state
object_state = object_collection.data.default_object_state.clone()
# need to shift the position of the cubes otherwise they will be on top of each other
object_state[..., :2] += origins.unsqueeze(1)[..., :2]
object_collection.write_object_state_to_sim(object_state)
# reset object
object_collection.reset()
# apply force
object_collection.set_external_force_and_torque(
external_wrench_b[..., :3], external_wrench_b[..., 3:], object_ids=object_ids
)
# perform simulation
for _ in range(10):
# apply action to the object
object_collection.write_data_to_sim()
# perform step
sim.step()
# update buffers
object_collection.update(sim.cfg.dt)
# First object should still be at the same Z position (1.0)
torch.testing.assert_close(
object_collection.data.object_pos_w[:, 0::2, 2],
torch.ones_like(object_collection.data.object_pos_w[:, 0::2, 2]),
)
# Second object should have fallen, so it's Z height should be less than initial height of 1.0
self.assertTrue(torch.all(object_collection.data.object_pos_w[:, 1::2, 2] < 1.0))
def test_set_object_state(self):
"""Test setting the state of the object.
In this test, we set the state of the object to a random state and check
that the object is in that state after simulation. We set gravity to zero as
we don't want any external forces acting on the object to ensure state remains static.
"""
for num_envs in (1, 3):
for num_cubes in (1, 2):
for device in ("cuda:0", "cpu"):
with self.subTest(num_envs=num_envs, num_cubes=num_cubes, device=device):
# Turn off gravity for this test as we don't want any external forces acting on the object
# to ensure state remains static
with build_simulation_context(
device=device, gravity_enabled=False, auto_add_lighting=True
) as sim:
# Generate cubes scene
object_collection, origins = generate_cubes_scene(
num_envs=num_envs, num_cubes=num_cubes, device=device
)
# Play the simulator
sim.reset()
state_types = ["object_pos_w", "object_quat_w", "object_lin_vel_w", "object_ang_vel_w"]
# Set each state type individually as they are dependent on each other
for state_type_to_randomize in state_types:
state_dict = {
"object_pos_w": torch.zeros_like(
object_collection.data.object_pos_w, device=sim.device
),
"object_quat_w": default_orientation(
num=num_cubes * num_envs, device=sim.device
).view(num_envs, num_cubes, 4),
"object_lin_vel_w": torch.zeros_like(
object_collection.data.object_lin_vel_w, device=sim.device
),
"object_ang_vel_w": torch.zeros_like(
object_collection.data.object_ang_vel_w, device=sim.device
),
}
# Now we are ready!
for _ in range(5):
# reset object
object_collection.reset()
# Set random state
if state_type_to_randomize == "object_quat_w":
state_dict[state_type_to_randomize] = random_orientation(
num=num_cubes * num_envs, device=sim.device
).view(num_envs, num_cubes, 4)
else:
state_dict[state_type_to_randomize] = torch.randn(
num_envs, num_cubes, 3, device=sim.device
)
# make sure objects do not overlap
if state_type_to_randomize == "object_pos_w":
state_dict[state_type_to_randomize][..., :2] += origins.unsqueeze(1)[
..., :2
]
# perform simulation
for _ in range(5):
object_state = torch.cat(
[
state_dict["object_pos_w"],
state_dict["object_quat_w"],
state_dict["object_lin_vel_w"],
state_dict["object_ang_vel_w"],
],
dim=-1,
)
# reset object state
object_collection.write_object_state_to_sim(object_state=object_state)
sim.step()
# assert that set object quantities are equal to the ones set in the state_dict
for key, expected_value in state_dict.items():
value = getattr(object_collection.data, key)
torch.testing.assert_close(value, expected_value, rtol=1e-5, atol=1e-5)
object_collection.update(sim.cfg.dt)
def test_reset_object_collection(self):
"""Test resetting the state of the rigid object."""
for num_envs in (1, 3):
for num_cubes in (1, 2):
for device in ("cuda:0", "cpu"):
with self.subTest(num_envs=num_envs, num_cubes=num_cubes, device=device):
with build_simulation_context(
device=device, gravity_enabled=True, auto_add_lighting=True
) as sim:
# Generate cubes scene
object_collection, _ = generate_cubes_scene(
num_envs=num_envs, num_cubes=num_cubes, device=device
)
# Play the simulator
sim.reset()
for i in range(5):
# perform rendering
sim.step()
# update object
object_collection.update(sim.cfg.dt)
# Move the object to a random position
object_state = object_collection.data.default_object_state.clone()
object_state[..., :3] = torch.randn(num_envs, num_cubes, 3, device=sim.device)
# Random orientation
object_state[..., 3:7] = random_orientation(num=num_cubes, device=sim.device)
object_collection.write_object_state_to_sim(object_state)
if i % 2 == 0:
# reset object
object_collection.reset()
# Reset should zero external forces and torques
self.assertFalse(object_collection.has_external_wrench)
self.assertEqual(torch.count_nonzero(object_collection._external_force_b), 0)
self.assertEqual(torch.count_nonzero(object_collection._external_torque_b), 0)
def test_set_material_properties(self):
"""Test getting and setting material properties of rigid object."""
for num_envs in (1, 3):
for num_cubes in (1, 2):
for device in ("cuda:0", "cpu"):
with self.subTest(num_envs=num_envs, num_cubes=num_cubes, device=device):
with build_simulation_context(
device=device, gravity_enabled=True, add_ground_plane=True, auto_add_lighting=True
) as sim:
# Generate cubes scene
object_collection, _ = generate_cubes_scene(
num_envs=num_envs, num_cubes=num_cubes, device=device
)
# Play sim
sim.reset()
# Set material properties
static_friction = torch.FloatTensor(num_envs, num_cubes, 1).uniform_(0.4, 0.8)
dynamic_friction = torch.FloatTensor(num_envs, num_cubes, 1).uniform_(0.4, 0.8)
restitution = torch.FloatTensor(num_envs, num_cubes, 1).uniform_(0.0, 0.2)
materials = torch.cat([static_friction, dynamic_friction, restitution], dim=-1)
indices = torch.tensor(range(num_cubes * num_envs), dtype=torch.int)
# Add friction to cube
object_collection.root_physx_view.set_material_properties(
object_collection.reshape_data_to_view(materials), indices
)
# Simulate physics
sim.step()
# update object
object_collection.update(sim.cfg.dt)
# Get material properties
materials_to_check = object_collection.root_physx_view.get_material_properties()
# Check if material properties are set correctly
torch.testing.assert_close(
object_collection.reshape_view_to_data(materials_to_check), materials
)
def test_gravity_vec_w(self):
"""Test that gravity vector direction is set correctly for the rigid object."""
for num_envs in (1, 3):
for num_cubes in (1, 2):
for device in ("cuda:0", "cpu"):
for gravity_enabled in [True, False]:
with self.subTest(
num_envs=num_envs, num_cubes=num_cubes, device=device, gravity_enabled=gravity_enabled
):
with build_simulation_context(device=device, gravity_enabled=gravity_enabled) as sim:
# Create a scene with random cubes
object_collection, _ = generate_cubes_scene(
num_envs=num_envs, num_cubes=num_cubes, device=device
)
# Obtain gravity direction
if gravity_enabled:
gravity_dir = (0.0, 0.0, -1.0)
else:
gravity_dir = (0.0, 0.0, 0.0)
# Play sim
sim.reset()
# Check that gravity is set correctly
self.assertEqual(object_collection.data.GRAVITY_VEC_W[0, 0, 0], gravity_dir[0])
self.assertEqual(object_collection.data.GRAVITY_VEC_W[0, 0, 1], gravity_dir[1])
self.assertEqual(object_collection.data.GRAVITY_VEC_W[0, 0, 2], gravity_dir[2])
# Simulate physics
for _ in range(2):
sim.step()
# update object
object_collection.update(sim.cfg.dt)
# Expected gravity value is the acceleration of the body
gravity = torch.zeros(num_envs, num_cubes, 6, device=device)
if gravity_enabled:
gravity[..., 2] = -9.81
# Check the body accelerations are correct
torch.testing.assert_close(object_collection.data.object_acc_w, gravity)
if __name__ == "__main__":
run_tests()
......@@ -23,7 +23,7 @@ from omni.isaac.lab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Demo on spawning different objects in multiple environments.")
parser.add_argument("--num_envs", type=int, default=1024, help="Number of environments to spawn.")
parser.add_argument("--num_envs", type=int, default=512, help="Number of environments to spawn.")
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
......@@ -41,7 +41,15 @@ import omni.usd
from pxr import Gf, Sdf
import omni.isaac.lab.sim as sim_utils
from omni.isaac.lab.assets import ArticulationCfg, AssetBaseCfg, RigidObjectCfg
from omni.isaac.lab.assets import (
Articulation,
ArticulationCfg,
AssetBaseCfg,
RigidObject,
RigidObjectCfg,
RigidObjectCollection,
RigidObjectCollectionCfg,
)
from omni.isaac.lab.scene import InteractiveScene, InteractiveSceneCfg
from omni.isaac.lab.sim import SimulationContext
from omni.isaac.lab.utils import Timer, configclass
......@@ -124,6 +132,52 @@ class MultiObjectSceneCfg(InteractiveSceneCfg):
init_state=RigidObjectCfg.InitialStateCfg(pos=(0.0, 0.0, 2.0)),
)
# object collection
object_collection: RigidObjectCollectionCfg = RigidObjectCollectionCfg(
rigid_objects={
"object_A": RigidObjectCfg(
prim_path="/World/envs/env_.*/Object_A",
spawn=sim_utils.SphereCfg(
radius=0.1,
visual_material=sim_utils.PreviewSurfaceCfg(diffuse_color=(1.0, 0.0, 0.0), metallic=0.2),
rigid_props=sim_utils.RigidBodyPropertiesCfg(
solver_position_iteration_count=4, solver_velocity_iteration_count=0
),
mass_props=sim_utils.MassPropertiesCfg(mass=1.0),
collision_props=sim_utils.CollisionPropertiesCfg(),
),
init_state=RigidObjectCfg.InitialStateCfg(pos=(0.0, -0.5, 2.0)),
),
"object_B": RigidObjectCfg(
prim_path="/World/envs/env_.*/Object_B",
spawn=sim_utils.CuboidCfg(
size=(0.1, 0.1, 0.1),
visual_material=sim_utils.PreviewSurfaceCfg(diffuse_color=(1.0, 0.0, 0.0), metallic=0.2),
rigid_props=sim_utils.RigidBodyPropertiesCfg(
solver_position_iteration_count=4, solver_velocity_iteration_count=0
),
mass_props=sim_utils.MassPropertiesCfg(mass=1.0),
collision_props=sim_utils.CollisionPropertiesCfg(),
),
init_state=RigidObjectCfg.InitialStateCfg(pos=(0.0, 0.5, 2.0)),
),
"object_C": RigidObjectCfg(
prim_path="/World/envs/env_.*/Object_C",
spawn=sim_utils.ConeCfg(
radius=0.1,
height=0.3,
visual_material=sim_utils.PreviewSurfaceCfg(diffuse_color=(1.0, 0.0, 0.0), metallic=0.2),
rigid_props=sim_utils.RigidBodyPropertiesCfg(
solver_position_iteration_count=4, solver_velocity_iteration_count=0
),
mass_props=sim_utils.MassPropertiesCfg(mass=1.0),
collision_props=sim_utils.CollisionPropertiesCfg(),
),
init_state=RigidObjectCfg.InitialStateCfg(pos=(0.5, 0.0, 2.0)),
),
}
)
# articulation
robot: ArticulationCfg = ArticulationCfg(
prim_path="/World/envs/env_.*/Robot",
......@@ -170,15 +224,16 @@ def run_simulator(sim: SimulationContext, scene: InteractiveScene):
"""Runs the simulation loop."""
# Extract scene entities
# note: we only do this here for readability.
rigid_object = scene["object"]
robot = scene["robot"]
rigid_object: RigidObject = scene["object"]
rigid_object_collection: RigidObjectCollection = scene["object_collection"]
robot: Articulation = scene["robot"]
# Define simulation stepping
sim_dt = sim.get_physics_dt()
count = 0
# Simulation loop
while simulation_app.is_running():
# Reset
if count % 500 == 0:
if count % 250 == 0:
# reset counter
count = 0
# reset the scene entities
......@@ -186,6 +241,10 @@ def run_simulator(sim: SimulationContext, scene: InteractiveScene):
root_state = rigid_object.data.default_root_state.clone()
root_state[:, :3] += scene.env_origins
rigid_object.write_root_state_to_sim(root_state)
# object collection
object_state = rigid_object_collection.data.default_object_state.clone()
object_state[..., :3] += scene.env_origins.unsqueeze(1)
rigid_object_collection.write_object_state_to_sim(object_state)
# robot
# -- root state
root_state = robot.data.default_root_state.clone()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment