Unverified Commit 6442cef5 authored by Pascal Roth's avatar Pascal Roth Committed by GitHub

Updates sensors to follow structure similar to assets (#123)

# Description

This MR adapts all the sensor classes to follow a structure similar to
the `assets` classes. This removes the need to worry about the spawning
and initialization of sensors manually by the users. It removes the
`debug_vis` function since that is now handled by a render callback
automatically (based on the passed configuration for the `debug_vis`
flag).

Additionally, the camera poses can now be set and obtained different
conventions:

* `opengl`: the camera is looking down the -Z axis with the +Y axis
pointing up
* `ros`: the camera is looking down the +Z axis with the +Y axis
pointing down
* `world`: the camera is looking along the +X axis with the -Z axis
pointing down

## Type of change

- New feature (non-breaking change which adds functionality)
- Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- This change requires a documentation update

## Checklist

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./orbit.sh --format`
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file

---------
Co-authored-by: 's avatarMayank Mittal <mittalma@leggedrobotics.com>
parent 5802697c
BSD License
For PyTorch3D software
Copyright (c) Meta Platforms, Inc. and affiliates. All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name Meta nor the names of its contributors may be used to
endorse or promote products derived from this software without specific
prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
[package]
# Note: Semantic Versioning is used: https://semver.org/
version = "0.9.0"
version = "0.9.1"
# Description
title = "ORBIT framework for Robot Learning"
......
Changelog
---------
0.9.1 (2023-08-18)
~~~~~~~~~~~~~~~~~~
Added
^^^^^
* Introduced three different rotation conventions in the :class:`omni.isaac.orbit.sensors.Camera` class. These
conventions are:
* ``opengl``: the camera is looking down the -Z axis with the +Y axis pointing up
* ``ros``: the camera is looking down the +Z axis with the +Y axis pointing down
* ``world``: the camera is looking along the +X axis with the -Z axis pointing down
These can be used to declare the camera offset in :class:`omni.isaac.orbit.sensors.CameraCfg.OffsetCfg` class
and in :meth:`omni.isaac.orbit.sensors.Camera.set_world_pose` method. Additionally, all conventions are
saved to :class:`omni.isaac.orbit.sensors.CameraData` class for easy access.
Changed
^^^^^^^
* Adapted all the sensor classes to follow a structure similar to the :class:`omni.issac.orbit.assets.AssetBase`.
Hence, the spawning and initialization of sensors manually by the users is avoided.
* Removed the :meth:`debug_vis` function since that this functionality is handled by a render callback automatically
(based on the passed configuration for the :class:`omni.isaac.orbit.sensors.SensorBaseCfg.debug_vis` flag).
0.9.0 (2023-08-18)
~~~~~~~~~~~~~~~~~~
......
......@@ -8,7 +8,8 @@ Camera wrapper around USD camera prim to provide an interface that follows the r
"""
from .camera import Camera
from .camera_cfg import FisheyeCameraCfg, PinholeCameraCfg
from .camera_cfg import CameraCfg
from .camera_data import CameraData
from .utils import * # noqa: F401, F403
__all__ = ["Camera", "CameraData", "PinholeCameraCfg", "FisheyeCameraCfg"]
__all__ = ["Camera", "CameraData", "CameraCfg"]
......@@ -3,34 +3,34 @@
#
# SPDX-License-Identifier: BSD-3-Clause
"""Camera class in Omniverse workflows."""
from __future__ import annotations
import builtins
import math
import numpy as np
import scipy.spatial.transform as tf
import torch
from tensordict import TensorDict
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union
from typing import TYPE_CHECKING, Any, Sequence
from typing_extensions import Literal
import omni.isaac.core.utils.prims as prim_utils
import omni.isaac.core.utils.stage as stage_utils
import omni.kit.commands
import omni.usd
from omni.isaac.core.prims import XFormPrimView
from omni.isaac.core.simulation_context import SimulationContext
from omni.isaac.core.utils.rotations import gf_quat_to_np_array
from pxr import Gf, Sdf, Usd, UsdGeom
from pxr import Gf, Usd, UsdGeom
# omni-isaac-orbit
from omni.isaac.orbit.utils import class_to_dict, to_camel_case
from omni.isaac.orbit.utils.array import TensorData, convert_to_torch
from omni.isaac.orbit.utils.math import convert_quat
from omni.isaac.orbit.utils import to_camel_case
from omni.isaac.orbit.utils.array import convert_to_torch
from omni.isaac.orbit.utils.math import quat_from_matrix
from ..sensor_base import SensorBase
from .camera_cfg import FisheyeCameraCfg, PinholeCameraCfg
from .camera_data import CameraData
from .utils import convert_orientation_convention
if TYPE_CHECKING:
from .camera_cfg import CameraCfg
class Camera(SensorBase):
......@@ -49,15 +49,6 @@ class Camera(SensorBase):
- ``"instance_segmentation"``: The instance segmentation data.
- ``"semantic_segmentation"``: The semantic segmentation data.
The camera sensor supports the following projection types:
- ``"pinhole"``: Standard pinhole camera model (disables fisheye parameters).
- ``"fisheye_orthographic"``: Fisheye camera model using orthographic correction.
- ``"fisheye_equidistant"``: Fisheye camera model using equidistant correction.
- ``"fisheye_equisolid"``: Fisheye camera model using equisolid correction.
- ``"fisheye_polynomial"``: Fisheye camera model with :math:`360^{\circ}` spherical projection.
- ``"fisheye_spherical"``: Fisheye camera model with :math:`360^{\circ}` full-frame projection.
.. note::
Currently the following sensor types are not supported in a "view" format:
......@@ -73,26 +64,40 @@ class Camera(SensorBase):
"""
def __init__(self, cfg: Union[PinholeCameraCfg, FisheyeCameraCfg]):
cfg: CameraCfg
"""The configuration parameters."""
def __init__(self, cfg: CameraCfg):
"""Initializes the camera sensor.
Args:
cfg (Union[PinholeCameraCfg, FisheyeCameraCfg]): The configuration parameters.
cfg (CameraCfg): The configuration parameters.
Raises:
ValueError: If the sensor types intersect with in the unsupported list.
"""
# store inputs
self.cfg = cfg
# initialize base class
super().__init__(cfg)
# spawn the asset
if self.cfg.spawn is not None:
# compute the rotation offset
rot = torch.tensor(self.cfg.offset.rot, dtype=torch.float32).unsqueeze(0)
rot_offset = convert_orientation_convention(rot, origin=self.cfg.offset.convention, target="opengl")
rot_offset = rot_offset.squeeze(0).numpy()
# spawn the asset
self.cfg.spawn.func(
self.cfg.prim_path, self.cfg.spawn, translation=self.cfg.offset.pos, orientation=rot_offset
)
# check that spawn was successful
matching_prim_paths = prim_utils.find_matching_prim_paths(self.cfg.prim_path)
if len(matching_prim_paths) == 0:
raise RuntimeError(f"Could not find prim with path {self.cfg.prim_path}.")
# UsdGeom Camera prim for the sensor
self._sensor_prims: List[UsdGeom.Camera] = list()
self._sensor_prims: list[UsdGeom.Camera] = list()
# Create empty variables for storing output data
self._data = CameraData()
# Flag to check that sensor is spawned.
self._is_spawned = False
# check if there is any intersection in unsupported types
# reason: these use np structured data types which we can't yet convert to torch tensor
unsupported_types = {"bounding_box_2d_tight", "bounding_box_2d_loose", "bounding_box_3d"}
......@@ -110,11 +115,11 @@ class Camera(SensorBase):
"""Returns: A string containing information about the instance."""
# message for class
return (
f"Camera @ '{self._view._regex_prim_paths}': \n"
f"\tdata types : {list(self.data.output.keys())} \n"
f"Camera @ '{self.cfg.prim_path}': \n"
f"\tdata types : {self.data.output.sorted_keys} \n"
f"\tupdate period (s): {self.cfg.update_period}\n"
f"\tshape : {self.image_shape}\n"
f"\tnumber of sensors : {self.count}"
f"\tnumber of sensors : {self._view.count}"
)
"""
......@@ -122,12 +127,19 @@ class Camera(SensorBase):
"""
@property
def frame(self) -> torch.Tensor:
def data(self) -> CameraData:
# update sensors if needed
self._update_outdated_buffers()
# return the data
return self._data
@property
def frame(self) -> torch.tensor:
"""Frame number when the measurement took place."""
return self._frame
@property
def render_product_paths(self) -> List[str]:
def render_product_paths(self) -> list[str]:
"""The path of the render products for the cameras.
This can be used via replicator interfaces to attach to writes or external annotator registry.
......@@ -135,7 +147,7 @@ class Camera(SensorBase):
return self._render_product_paths
@property
def image_shape(self) -> Tuple[int, int]:
def image_shape(self) -> tuple[int, int]:
"""A tuple containing (height, width) of the camera sensor."""
return (self.cfg.height, self.cfg.width)
......@@ -144,7 +156,7 @@ class Camera(SensorBase):
"""
def set_intrinsic_matrices(
self, matrices: TensorData, focal_length: float = 1.0, indices: Optional[Sequence[int]] = None
self, matrices: torch.Tensor, focal_length: float = 1.0, indices: Sequence[int] | None = None
):
"""Set parameters of the USD camera from its intrinsic matrix.
......@@ -163,7 +175,7 @@ class Camera(SensorBase):
is not true in the input intrinsic matrix, then the camera will not set up correctly.
Args:
matrices (TensorData): The intrinsic matrices for the camera. Shape: :math:`(N, 3, 3)`.
matrices (torch.Tensor): The intrinsic matrices for the camera. Shape: :math:`(N, 3, 3)`.
focal_length (float, optional): Focal length to use when computing aperture values. Defaults to 1.0.
indices (Sequence[int], optional): A list of indices of length :obj:`N` to specify the prims to manipulate.
Defaults to None, which means all prims will be manipulated.
......@@ -171,7 +183,7 @@ class Camera(SensorBase):
# resolve indices
# check camera prim exists
if not self._is_initialized:
raise RuntimeError("Camera is not initialized. Please call 'initialize(...)' first.")
raise RuntimeError("Camera is not initialized. Please call 'sim.play()' first.")
# resolve indices
if indices is None:
indices = self._ALL_INDICES
......@@ -212,61 +224,68 @@ class Camera(SensorBase):
Operations - Set pose.
"""
def set_world_poses_ros(
self, positions: TensorData = None, orientations: TensorData = None, indices: Optional[Sequence[int]] = None
def set_world_poses(
self,
positions: torch.Tensor | None = None,
orientations: torch.Tensor | None = None,
indices: Sequence[int] | None = None,
convention: Literal["opengl", "ros", "world"] = "ros",
):
r"""Set the pose of the camera w.r.t. world frame using ROS convention.
r"""Set the pose of the camera w.r.t. the world frame using specified convention.
In USD, the camera is always in **Y up** convention. This means that the camera is looking down the -Z axis
with the +Y axis pointing up , and +X axis pointing right. However, in ROS, the camera is looking down
the +Z axis with the +Y axis pointing down, and +X axis pointing right. Thus, the camera needs to be rotated
by :math:`180^{\circ}` around the X axis to follow the ROS convention.
Since different fields use different conventions for camera orientations, the method allows users to
set the camera poses in the specified convention. Possible conventions are:
.. math::
- :obj:"opengl" - forward axis: -Z - up axis +Y - Offset is applied in the OpenGL (Usd.Camera) convention
- :obj:"ros" - forward axis: +Z - up axis -Y - Offset is applied in the ROS convention
- :obj:"world" - forward axis: +X - up axis +Z - Offset is applied in the World Frame convention
T_{ROS} = \begin{bmatrix} 1 & 0 & 0 & 0 \\ 0 & -1 & 0 & 0 \\ 0 & 0 & -1 & 0 \\ 0 & 0 & 0 & 1 \end{bmatrix} T_{USD}
See :meth:`omni.isaac.orbit.sensors.camera.utils.convert_orientation_convention` for more details
on the conventions.
Args:
positions (TensorData, optional): The cartesian coordinates (in meters). Shape: :math:`(N, 3)`. Defaults to None.
orientations (TensorData, optional): The quaternion orientation in (w, x, y, z). Shape: :math:`(N, 4)`. Defaults to None.
positions (torch.Tensor | None, optional): The cartesian coordinates (in meters).
Shape: :math:`(N, 3)`. Defaults to None, in which case the camera position in not changed.
orientations (torch.Tensor | None, optional): The quaternion orientation in (w, x, y, z).
Shape: :math:`(N, 4)`. Defaults to None, in which case the camera orientation in not changed.
indices (Sequence[int], optional): A list of indices of length :obj:`N` to specify the prims to manipulate.
Defaults to None, which means all prims will be manipulated.
convention (Literal["opengl", "ros", "world"], optional): The convention in which the poses are fed.
Defaults to "ros".
Raises:
RuntimeError: If the camera prim is not set. Need to call :meth:`initialize` method first.
"""
# check camera prim exists
if not self._is_initialized:
raise RuntimeError("Camera is not initialized. Please call 'initialize(...)' first.")
raise RuntimeError("Camera is not initialized. Please call 'sim.play()' first.")
# resolve indices
if indices is None:
indices = self._ALL_INDICES
# convert to backend tensor
if positions is not None:
if isinstance(positions, np.ndarray):
positions = torch.from_numpy(positions).to(device=self._device)
elif not isinstance(positions, torch.Tensor):
positions = torch.tensor(positions, device=self._device)
# convert rotation matrix from ROS convention to OpenGL
# convert rotation matrix from input convention to OpenGL
if orientations is not None:
# TODO: Make this more efficient
for index, quat in enumerate(orientations):
rotm = tf.Rotation.from_quat(convert_quat(quat, "xyzw")).as_matrix()
rotm[:, 2] = -rotm[:, 2]
rotm[:, 1] = -rotm[:, 1]
# convert to isaac-sim convention
quat_gl = tf.Rotation.from_matrix(rotm).as_quat()
orientations[index] = convert_quat(quat_gl, "wxyz")
# convert to backend tensor
if isinstance(orientations, np.ndarray):
orientations = torch.from_numpy(orientations).to(device=self._device)
elif not isinstance(orientations, torch.tensor):
orientations = torch.tensor(orientations, device=self._device)
else:
orientations = None
orientations = convert_orientation_convention(orientations, origin=convention, target="opengl")
# set the pose
self._view.set_world_poses(positions, orientations, indices)
def set_world_poses_from_view(self, eyes: TensorData, targets: TensorData, indices: Optional[Sequence[int]] = None):
def set_world_poses_from_view(
self, eyes: torch.Tensor, targets: torch.Tensor, indices: Sequence[int] | None = None
):
"""Set the poses of the camera from the eye position and look-at target position.
Args:
eyes (TensorData): The positions of the camera's eye. Shape: :math:`(N, 3)`.
targets (TensorData): The target locations to look at. Shape: :math:`(N, 3)`.
eyes (torch.Tensor): The positions of the camera's eye. Shape is :math:`(N, 3)`.
targets (torch.Tensor): The target locations to look at. Shape is :math:`(N, 3)`.
indices (Sequence[int], optional): A list of indices of length :math:`N` to specify the prims to manipulate.
Defaults to None, which means all prims will be manipulated.
......@@ -276,7 +295,7 @@ class Camera(SensorBase):
"""
# check camera prim exists
if not self._is_initialized:
raise RuntimeError("Camera is not initialized. Please call 'initialize(...)' first.")
raise RuntimeError("Camera is not initialized. Please 'sim.play()' first.")
# resolve indices
if indices is None:
indices = self._ALL_INDICES
......@@ -327,67 +346,45 @@ class Camera(SensorBase):
Operations
"""
def spawn(self, prim_path: str, translation: Sequence[float] = None, orientation: Sequence[float] = None):
"""Spawns the sensor into the stage.
The USD Camera prim is spawned under the parent prim at the path ``parent_prim_path`` with the provided input
translation and orientation.
Args:
prim_path (str): The path of the prim to attach sensor to.
translation (Sequence[float], optional): The local position offset w.r.t. parent prim. Defaults to None.
orientation (Sequence[float], optional): The local rotation offset in ``(w, x, y, z)`` w.r.t.
parent prim. Defaults to None.
def reset(self, env_ids: Sequence[int] | None = None):
# reset the timestamps
super().reset(env_ids)
# resolve None
# note: cannot do smart indexing here since we do a for loop over data.
if env_ids is None:
env_ids = self._ALL_INDICES
# reset the data
# note: this recomputation is useful if one performs randomization on the camera poses.
self._update_poses(env_ids)
self._update_intrinsic_matrices(env_ids)
# Set all reset sensors to not outdated since their value won't be updated till next sim step.
self._is_outdated[env_ids] = False
# Reset the frame count
self._frame[env_ids] = 0
Raises:
RuntimeError: If a prim already exists at the path.
"""
# Create sensor prim
if not prim_utils.is_prim_path_valid(prim_path):
prim_utils.create_prim(prim_path, "Camera", translation=translation, orientation=orientation)
else:
raise RuntimeError(f"Unable to spawn camera. A prim already exists at path '{prim_path}'.")
# Add replicator camera attributes
self._define_usd_camera_attributes(prim_path)
# Save prim path for later use
self._spawn_prim_path = prim_path
# Set spawning to true
self._is_spawned = True
def initialize(self, prim_paths_expr: str = None):
Implementation.
"""
def _initialize_impl(self):
"""Initializes the sensor handles and internal buffers.
This function creates handles and registers the provided data types with the replicator registry to
be able to access the data from the sensor. It also initializes the internal buffers to store the data.
The function also allows initializing to a camera not spawned by using the :meth:`spawn` method.
For instance, cameras that already exist in the USD stage. In such cases, the USD settings present on
the camera prim is used instead of the settings passed in the configuration object.
Args:
prim_paths_expr (str, optional): The prim path expression to cameras. Defaults to None.
Raises:
RuntimeError: When input `cam_prim_path` is :obj:`None`, the method defaults to using the last
camera prim path set when calling the :meth:`spawn` function. In case, the camera was not spawned
and no valid `cam_prim_path` is provided, the function throws an error.
"""
import omni.replicator.core as rep
# Check that sensor has been spawned
if prim_paths_expr is None:
if not self._is_spawned:
raise RuntimeError(
"Initialize the camera failed! Please provide a valid argument for `prim_paths_expr`."
)
# use the prim path from spawning
prim_paths_expr = self._spawn_prim_path
# Initialize parent class
super()._initialize_impl()
# Create a view for the sensor
self._view = XFormPrimView(prim_paths_expr, reset_xform_properties=False)
self._view = XFormPrimView(self.cfg.prim_path, reset_xform_properties=False)
self._view.initialize()
# Initialize parent class
super().initialize(prim_paths_expr)
# Check that sizes are correct
if self._view.count != self._num_envs:
raise RuntimeError(
f"Number of camera prims in the view ({self._view.count}) does not match the number of environments "
f"({self._num_envs})."
)
# Create all indices buffer
self._ALL_INDICES = torch.arange(self._view.count, device=self._device, dtype=torch.long)
......@@ -395,8 +392,8 @@ class Camera(SensorBase):
self._frame = torch.zeros(self._view.count, device=self._device, dtype=torch.long)
# Attach the sensor data types to render node
self._render_product_paths: List[str] = list()
self._rep_registry: Dict[str, List[rep.annotators.Annotator]] = {name: list() for name in self.cfg.data_types}
self._render_product_paths: list[str] = list()
self._rep_registry: dict[str, list[rep.annotators.Annotator]] = {name: list() for name in self.cfg.data_types}
# Resolve device name
if "cuda" in self._device:
device_name = self._device.split(":")[0]
......@@ -438,52 +435,20 @@ class Camera(SensorBase):
self._rep_registry[name].append(rep_annotator)
# Create internal buffers
self._create_buffers()
# When running in standalone mode, need to render a few times to fill all the buffers
# FIXME: Check with simulation team to get rid of this. What if someone has render or other callbacks?
if builtins.ISAAC_LAUNCHED_FROM_TERMINAL is False:
# acquire simulation context
sim = SimulationContext.instance()
# render a few times
if sim is not None:
for _ in range(4):
sim.render()
def reset_buffers(self, env_ids: Optional[Sequence[int]] = None):
# reset the timestamps
super().reset_buffers(env_ids)
# resolve None
# note: cannot do smart indexing here since we do a for loop over data.
if env_ids is None:
env_ids = self._ALL_INDICES
# reset the data
# note: this recomputation is useful if one performs randomization on the camera poses.
self._update_ros_poses(env_ids)
self._update_intrinsic_matrices(env_ids)
# Set all reset sensors to not outdated since their value won't be updated till next sim step.
self._is_outdated[env_ids] = False
# Reset the frame count
self._frame[env_ids] = 0
"""
Implementation.
"""
def _update_buffers(self, env_ids: Optional[Sequence[int]] = None):
def _update_buffers_impl(self, env_ids: Sequence[int]):
# check camera prim exists
if not self._is_initialized:
raise RuntimeError("Camera is not initialized. Please call 'initialize(...)' first.")
# Resolve sensor ids
if env_ids is None:
env_ids = self._ALL_INDICES
raise RuntimeError("Camera is not initialized. Please call 'sim.play()' first.")
# Increment frame count
self._frame[env_ids] += 1
# -- intrinsic matrix
self._update_intrinsic_matrices(env_ids)
# -- pose
self._update_ros_poses(env_ids)
self._update_poses(env_ids)
# -- read the data from annotator registry
# check if buffer is called for the first time. If so then, allocate the memory
if self._data.output is None:
if len(self._data.output.sorted_keys) == 0:
# this is the first time buffer is called
# it allocates memory for all the sensors
self._create_annotator_data()
......@@ -509,75 +474,21 @@ class Camera(SensorBase):
"""Create buffers for storing data."""
# create the data object
# -- pose of the cameras
self._data.position = torch.zeros((self._num_envs, 3), device=self._device)
self._data.orientation = torch.zeros((self._num_envs, 4), device=self._device)
self._data.pos_w = torch.zeros((self._view.count, 3), device=self._device)
self._data.quat_w_ros = torch.zeros((self._view.count, 4), device=self._device)
self._data.quat_w_world = torch.zeros((self._view.count, 4), device=self._device)
self._data.quat_w_opengl = torch.zeros((self._view.count, 4), device=self._device)
# -- intrinsic matrix
self._data.intrinsic_matrices = torch.zeros((self._num_envs, 3, 3), device=self._device)
self._data.intrinsic_matrices = torch.zeros((self._view.count, 3, 3), device=self._device)
self._data.image_shape = self.image_shape
# -- output data
# lazy allocation of data dictionary
# since the size of the output data is not known in advance, we leave it as None
# the memory will be allocated when the buffer() function is called for the first time.
self._data.output = None
def _define_usd_camera_attributes(self, prim_path: str):
"""Creates and sets USD camera attributes.
self._data.output = TensorDict({}, batch_size=self._view.count, device=self.device)
self._data.info = [{name: None for name in self.cfg.data_types}] * self._view.count
This function creates additional attributes on the camera prim used by Replicator.
It also sets the default values for these attributes based on the camera configuration.
Args:
prim_path (str): The prim path to the camera.
"""
# lock camera from viewport (this disables viewport movement for camera)
kwargs = {
"prop_path": Sdf.Path(f"{prim_path}.omni:kit:cameraLock"),
"value": True,
"prev": None,
"type_to_create_if_not_exist": Sdf.ValueTypeNames.Bool,
}
omni.kit.commands.execute("ChangePropertyCommand", **kwargs)
# camera attributes
# reference: omni.replicator.core.scripts.create.py: camera()
attribute_types = {
"cameraProjectionType": "token",
"fthetaWidth": "float",
"fthetaHeight": "float",
"fthetaCx": "float",
"fthetaCy": "float",
"fthetaMaxFov": "float",
"fthetaPolyA": "float",
"fthetaPolyB": "float",
"fthetaPolyC": "float",
"fthetaPolyD": "float",
"fthetaPolyE": "float",
}
# get camera prim
prim = prim_utils.get_prim_at_path(prim_path)
# create attributes
for attr_name, attr_type in attribute_types.items():
# check if attribute does not exist
if prim.GetAttribute(attr_name).Get() is None:
# create attribute based on type
if attr_type == "token":
prim.CreateAttribute(attr_name, Sdf.ValueTypeNames.Token)
elif attr_type == "float":
prim.CreateAttribute(attr_name, Sdf.ValueTypeNames.Float)
# set attribute values
# -- projection type
projection_type = to_camel_case(self.cfg.projection_type, to="cC")
prim.GetAttribute("cameraProjectionType").Set(projection_type)
# -- other attributes
for param_name, param_value in class_to_dict(self.cfg.usd_params).items():
# check if value is valid
if param_value is None:
continue
# convert to camel case (CC)
param = to_camel_case(param_name, to="cC")
# get attribute from the class
prim.GetAttribute(param).Set(param_value)
def _update_intrinsic_matrices(self, env_ids: Iterable[int]):
def _update_intrinsic_matrices(self, env_ids: Sequence[int]):
"""Compute camera's matrix of intrinsic parameters.
Also called calibration matrix. This matrix works for linear depth images. We assume square pixels.
......@@ -606,7 +517,7 @@ class Camera(SensorBase):
self._data.intrinsic_matrices[i, 1, 2] = height * 0.5
self._data.intrinsic_matrices[i, 2, 2] = 1
def _update_ros_poses(self, env_ids: Iterable[int]):
def _update_poses(self, env_ids: Sequence[int]):
"""Computes the pose of the camera in the world frame with ROS convention.
This methods uses the ROS convention to resolve the input pose. In this convention,
......@@ -617,9 +528,10 @@ class Camera(SensorBase):
"""
# check camera prim exists
if len(self._sensor_prims) == 0:
raise RuntimeError("Camera prim is None. Please call 'initialize(...)' first.")
raise RuntimeError("Camera prim is None. Please call 'sim.play()' first.")
# iterate over all cameras
prim_tf_all = np.zeros((len(env_ids), 4, 4))
for i in env_ids:
# obtain corresponding sensor prim
sensor_prim = self._sensor_prims[i]
......@@ -629,19 +541,22 @@ class Camera(SensorBase):
# which implies, for example, that it is the fourth row of a GfMatrix4d that specifies
# the translation of the transformation. Thus, we take transpose here to make it post multiply.
prim_tf = np.transpose(prim_tf)
prim_tf_all[i] = prim_tf
# extract the position (convert it to SI units-- assumed that stage units is 1.0)
self._data.position[i] = torch.tensor(prim_tf[0:3, 3], device=self._device)
# extract rotation
# Note: OpenGL camera transform is such that camera faces along -z axis and +y is up.
# In robotics, we need to rotate it such that the camera is along +z axis and -y is up.
cam_rotm = prim_tf[0:3, 0:3]
# make +z forward
cam_rotm[:, 2] = -cam_rotm[:, 2]
# make +y down
cam_rotm[:, 1] = -cam_rotm[:, 1]
# convert rotation to quaternion
quat = tf.Rotation.from_matrix(cam_rotm).as_quat()
self._data.orientation[i] = torch.tensor(convert_quat(quat, "wxyz"), device=self._device)
self._data.pos_w[env_ids] = torch.tensor(prim_tf_all[:, 0:3, 3], device=self._device, dtype=torch.float32)
# save opengl convention
quat_opengl = quat_from_matrix(torch.tensor(prim_tf_all[:, 0:3, 0:3], device=self._device, dtype=torch.float32))
self._data.quat_w_opengl[env_ids] = quat_opengl
# save world and ros convention
self._data.quat_w_world[env_ids] = convert_orientation_convention(
quat_opengl.clone(), origin="opengl", target="world"
)
self._data.quat_w_ros[env_ids] = convert_orientation_convention(
quat_opengl.clone(), origin="opengl", target="ros"
)
def _create_annotator_data(self):
"""Create the buffers to store the annotator data.
......@@ -651,9 +566,6 @@ class Camera(SensorBase):
This is an expensive operation and should be called only once.
"""
# lazy allocation of data dictionary
self._data.output = TensorDict({}, batch_size=self.count, device=self.device)
self._data.info = [{name: None for name in self.cfg.data_types}] * self.count
# add data from the annotators
for name, annotators in self._rep_registry.items():
# create a list to store the data for each annotator
......@@ -671,7 +583,7 @@ class Camera(SensorBase):
# concatenate the data along the batch dimension
self._data.output[name] = torch.stack(data_all_cameras, dim=0)
def _process_annotator_output(self, output: Any) -> Tuple[TensorData, dict]:
def _process_annotator_output(self, output: Any) -> tuple[torch.tensor, dict]:
"""Process the annotator output.
This function is called after the data has been collected from all the cameras.
......
......@@ -3,66 +3,68 @@
#
# SPDX-License-Identifier: BSD-3-Clause
"""Configuration for the camera sensor."""
from __future__ import annotations
from dataclasses import MISSING
from typing import List, Tuple
from typing_extensions import Literal
from omni.isaac.orbit.sim import FisheyeCameraCfg, PinholeCameraCfg
from omni.isaac.orbit.utils import configclass
from ..sensor_base_cfg import SensorBaseCfg
__all__ = ["PinholeCameraCfg", "FisheyeCameraCfg"]
from .camera import Camera
@configclass
class PinholeCameraCfg(SensorBaseCfg):
"""Configuration for a pinhole camera sensor."""
class CameraCfg(SensorBaseCfg):
"""Configuration for a camera sensor."""
@configclass
class UsdCameraCfg:
"""USD related configuration of the sensor.
class OffsetCfg:
"""The offset pose of the sensor's frame from the sensor's parent frame."""
The parameter is kept default from USD if it is set to :obj:`None`. This includes the default
parameters (in case the sensor is created) or the ones set by the user (in case the sensor is
loaded from existing USD stage).
pos: tuple[float, float, float] = (0.0, 0.0, 0.0)
"""Translation w.r.t. the parent frame. Defaults to (0.0, 0.0, 0.0)."""
Reference:
* https://docs.omniverse.nvidia.com/prod_extensions/prod_extensions/ext_replicator/annotators_details.html
* https://graphics.pixar.com/usd/docs/api/class_usd_geom_camera.html
"""
rot: tuple[float, float, float, float] = (1.0, 0.0, 0.0, 0.0)
"""Quaternion rotation ``(w, x, y, z)`` w.r.t. the parent frame. Defaults to (1.0, 0.0, 0.0, 0.0)."""
clipping_range: Tuple[float, float] = None
"""Near and far clipping distances (in stage units)."""
focal_length: float = None
"""Perspective focal length (in mm). Longer lens lengths narrower FOV, shorter lens lengths wider FOV."""
focus_distance: float = None
"""Distance from the camera to the focus plane (in stage units).
convention: Literal["opengl", "ros", "world"] = "ros"
"""The convention in which the frame offset is applied. Defaults to "ros".
- ``"opengl"`` - forward axis: ``-Z`` - up axis: ``+Y`` - Offset is applied in the OpenGL (Usd.Camera) convention.
- ``"ros"`` - forward axis: ``+Z`` - up axis: ``-Y`` - Offset is applied in the ROS convention.
- ``"world"`` - forward axis: ``+X`` - up axis: ``+Z`` - Offset is applied in the World Frame convention.
The distance at which perfect sharpness is achieved.
"""
f_stop: float = None
"""Lens aperture. Defaults to 0.0, which turns off focusing.
Controls Distance Blurring. Lower Numbers decrease focus range, larger numbers increase it.
cls_name = Camera
offset: OffsetCfg = OffsetCfg()
"""The offset pose of the sensor's frame from the sensor's parent frame. Defaults to identity.
Note:
The parent frame is the frame the sensor attaches to. For example, the parent frame of a
camera at path ``/World/envs/env_0/Robot/Camera`` is ``/World/envs/env_0/Robot``.
"""
horizontal_aperture: float = None
"""Horizontal aperture (in mm). Emulates sensor/film width on a camera."""
horizontal_aperture_offset: float = None
"""Offsets Resolution/Film gate horizontally."""
vertical_aperture_offset: float = None
"""Offsets Resolution/Film gate vertically."""
cls_name = "Camera"
spawn: PinholeCameraCfg | FisheyeCameraCfg | None = MISSING
"""Spawn configuration for the asset.
If :obj:`None`, then the prim is not spawned by the asset. Instead, it is assumed that the
asset is already present in the scene.
"""
data_types: List[str] = ["rgb"]
data_types: list[str] = ["rgb"]
"""List of sensor names/types to enable for the camera. Defaults to ["rgb"]."""
width: int = MISSING
"""Width of the image in pixels."""
height: int = MISSING
"""Height of the image in pixels."""
semantic_types: List[str] = ["class"]
semantic_types: list[str] = ["class"]
"""List of allowed semantic types the types. Defaults to ["class"].
For example, if semantic types is [“class”], only the bounding boxes for prims with semantics of
......@@ -71,6 +73,7 @@ class PinholeCameraCfg(SensorBaseCfg):
More information available at:
https://docs.omniverse.nvidia.com/app_code/prod_extensions/ext_replicator/semantic_schema_editor.html
"""
colorize: bool = False
"""whether to output colorized semantic information or non-colorized one. Defaults to False.
......@@ -80,42 +83,3 @@ class PinholeCameraCfg(SensorBaseCfg):
If False, the semantic images will be a 2D array of integers, where each pixel is an integer representing
the semantic ID. Accordingly, the information output will contain mapping from semantic ID to semantic labels.
"""
projection_type: str = "pinhole"
"""Type of projection to use for the camera. Defaults to "pinhole"."""
usd_params: UsdCameraCfg = UsdCameraCfg()
"""Parameters for setting USD camera settings."""
@configclass
class FisheyeCameraCfg(PinholeCameraCfg):
"""Configuration for a fisheye camera sensor."""
@configclass
class UsdCameraCfg(PinholeCameraCfg.UsdCameraCfg):
"""USD related configuration of the sensor for the fisheye model."""
fisheye_nominal_width: float = None
"""Nominal width of fisheye lens model."""
fisheye_nominal_height: float = None
"""Nominal height of fisheye lens model."""
fisheye_optical_centre_x: float = None
"""Horizontal optical centre position of fisheye lens model."""
fisheye_optical_centre_y: float = None
"""Vertical optical centre position of fisheye lens model."""
fisheye_max_fov: float = None
"""Maximum field of view of fisheye lens model."""
fisheye_polynomial_a: float = None
"""First component of fisheye polynomial."""
fisheye_polynomial_b: float = None
"""Second component of fisheye polynomial."""
fisheye_polynomial_c: float = None
"""Third component of fisheye polynomial."""
fisheye_polynomial_d: float = None
"""Fourth component of fisheye polynomial."""
fisheye_polynomial_e: float = None
"""Fifth component of fisheye polynomial."""
projection_type: str = "fisheye_polynomial"
"""Type of projection to use for the camera. Defaults to "fisheye_polynomial"."""
usd_params: UsdCameraCfg = UsdCameraCfg()
"""Parameters for setting USD camera settings."""
import numpy as np
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import torch
from dataclasses import dataclass
from tensordict import TensorDict
from typing import Any, Dict, List, Tuple, Union
from omni.isaac.orbit.utils.array import TensorData
from typing import Any
@dataclass
class CameraData:
"""Data container for the camera sensor."""
position: TensorData = None
##
# Frame state.
##
pos_w: torch.Tensor = None
"""Position of the sensor origin in world frame, following ROS convention.
Shape is (N, 3) where ``N`` is the number of sensors.
"""
orientation: TensorData = None
quat_w_ros: torch.Tensor = None
"""Quaternion orientation `(w, x, y, z)` of the sensor origin in world frame, following ROS convention.
.. note::
ROS convention follows the camera aligned with forward axis +Z and up axis -Y.
Shape: (N, 4) where ``N`` is the number of sensors.
"""
intrinsic_matrices: TensorData = None
quat_w_world: torch.Tensor = None
"""Quaternion orientation `(w, x, y, z)` of the sensor origin in world frame, following the world coordinate frame
.. note::
World frame convention follows the camera aligned with forward axis +X and up axis +Z.
Shape: (N, 4) where ``N`` is the number of sensors.
"""
quat_w_opengl: torch.Tensor = None
"""Quaternion orientation `(w, x, y, z)` of the sensor origin in world frame, following Opengl / Usd.Camera convention
.. note::
OpenGL convention follows the camera aligned with forward axis -Z and up axis +Y.
Shape: (N, 4) where ``N`` is the number of sensors.
"""
##
# Camera data
##
image_shape: tuple[int, int] = None
"""A tuple containing (height, width) of the camera sensor."""
intrinsic_matrices: torch.Tensor = None
"""The intrinsic matrices for the camera.
Shape is (N, 3, 3) where ``N`` is the number of sensors.
"""
image_shape: Tuple[int, int] = None
"""A tuple containing (height, width) of the camera sensor."""
output: Union[Dict[str, np.ndarray], TensorDict] = None
output: TensorDict = None
"""The retrieved sensor data with sensor types as key.
The format of the data is available in the `Replicator Documentation`_. For semantic-based data,
......@@ -35,7 +73,8 @@ class CameraData:
.. _Replicator Documentation: https://docs.omniverse.nvidia.com/prod_extensions/prod_extensions/ext_replicator/annotators_details.html#annotator-output
"""
info: List[Dict[str, Any]] = None
info: list[dict[str, Any]] = None
"""The retrieved sensor info with sensor types as key.
This contains extra information provided by the sensor such as semantic segmentation label mapping, prim paths.
......
......@@ -4,17 +4,25 @@
# SPDX-License-Identifier: BSD-3-Clause
"""Helper functions to project between pointcloud and depth images."""
from __future__ import annotations
import math
import numpy as np
import torch
from typing import Optional, Sequence, Tuple, Union
from typing import Sequence
from typing_extensions import Literal
import warp as wp
import omni.isaac.orbit.utils.math as math_utils
from omni.isaac.orbit.utils.array import TensorData, convert_to_torch
__all__ = ["transform_points", "create_pointcloud_from_depth", "create_pointcloud_from_rgbd"]
__all__ = [
"transform_points",
"create_pointcloud_from_depth",
"create_pointcloud_from_rgbd",
"convert_orientation_convention",
]
"""
......@@ -24,10 +32,10 @@ Depth <-> Pointcloud conversions.
def transform_points(
points: TensorData,
position: Optional[Sequence[float]] = None,
orientation: Optional[Sequence[float]] = None,
device: Union[torch.device, str, None] = None,
) -> Union[np.ndarray, torch.Tensor]:
position: Sequence[float] | None = None,
orientation: Sequence[float] | None = None,
device: torch.device | str | None = None,
) -> np.ndarray | torch.Tensor:
r"""Transform input points in a given frame to a target frame.
This function transform points from a source frame to a target frame. The transformation is defined by the
......@@ -78,13 +86,13 @@ def transform_points(
def create_pointcloud_from_depth(
intrinsic_matrix: Union[np.ndarray, torch.Tensor, wp.array],
depth: Union[np.ndarray, torch.Tensor, wp.array],
intrinsic_matrix: np.ndarray | torch.Tensor | wp.array,
depth: np.ndarray | torch.Tensor | wp.array,
keep_invalid: bool = False,
position: Optional[Sequence[float]] = None,
orientation: Optional[Sequence[float]] = None,
device: Optional[Union[torch.device, str]] = None,
) -> Union[np.ndarray, torch.Tensor]:
position: Sequence[float] | None = None,
orientation: Sequence[float] | None = None,
device: torch.device | str | None = None,
) -> np.ndarray | torch.Tensor:
r"""Creates pointcloud from input depth image and camera intrinsic matrix.
This function creates a pointcloud from a depth image and camera intrinsic matrix. The pointcloud is
......@@ -162,15 +170,15 @@ def create_pointcloud_from_depth(
def create_pointcloud_from_rgbd(
intrinsic_matrix: Union[torch.Tensor, np.ndarray, wp.array],
depth: Union[torch.Tensor, np.ndarray, wp.array],
rgb: Union[torch.Tensor, wp.array, np.ndarray, Tuple[float, float, float]] = None,
intrinsic_matrix: torch.Tensor | np.ndarray | wp.array,
depth: torch.Tensor | np.ndarray | wp.array,
rgb: torch.Tensor | wp.array | np.ndarray | tuple[float, float, float] = None,
normalize_rgb: bool = False,
position: Optional[Sequence[float]] = None,
orientation: Optional[Sequence[float]] = None,
device: Optional[Union[torch.device, str]] = None,
position: Sequence[float] | None = None,
orientation: Sequence[float] | None = None,
device: torch.device | str | None = None,
num_channels: int = 3,
) -> Union[Tuple[torch.Tensor, torch.Tensor], Tuple[np.ndarray, np.ndarray]]:
) -> tuple[torch.Tensor, torch.Tensor] | tuple[np.ndarray, np.ndarray]:
"""Creates pointcloud from input depth image and camera transformation matrix.
This function provides the same functionality as :meth:`create_pointcloud_from_depth` but also allows
......@@ -243,7 +251,7 @@ def create_pointcloud_from_rgbd(
# convert the matrix to (W, H, 3) from (H, W, 3) since depth processing
# is done in the order (u, v) where u: (0, W-1) and v: (0 - H-1)
points_rgb = rgb.permute(1, 0, 2).reshape(-1, 3)
elif isinstance(rgb, Tuple):
elif isinstance(rgb, (tuple, list)):
# same color for all points
points_rgb = torch.Tensor((rgb,) * num_points, device=device, dtype=torch.uint8)
else:
......@@ -269,3 +277,90 @@ def create_pointcloud_from_rgbd(
return points_xyz.cpu().numpy(), points_rgb.cpu().numpy()
else:
return points_xyz, points_rgb
def convert_orientation_convention(
orientation: torch.Tensor,
origin: Literal["opengl", "ros", "world"] = "opengl",
target: Literal["opengl", "ros", "world"] = "ros",
) -> torch.Tensor:
r"""Converts a quaternion representing a rotation from one convention to another.
In USD, the camera follows the ``"opengl"`` convention. Thus, it is always in **Y up** convention.
This means that the camera is looking down the -Z axis with the +Y axis pointing up , and +X axis pointing right.
However, in ROS, the camera is looking down the +Z axis with the +Y axis pointing down, and +X axis pointing right.
Thus, the camera needs to be rotated by :math:`180^{\circ}` around the X axis to follow the ROS convention.
.. math::
T_{ROS} = \begin{bmatrix} 1 & 0 & 0 & 0 \\ 0 & -1 & 0 & 0 \\ 0 & 0 & -1 & 0 \\ 0 & 0 & 0 & 1 \end{bmatrix} T_{USD}
On the other hand, the typical world coordinate system is with +X pointing forward, +Y pointing left,
and +Z pointing up. The camera can also be set in this convention by rotating the camera by :math:`90^{\circ}`
around the X axis and :math:`-90^{\circ}` around the Y axis.
.. math::
T_{WORLD} = \begin{bmatrix} 0 & 0 & -1 & 0 \\ -1 & 0 & 0 & 0 \\ 0 & 1 & 0 & 0 \\ 0 & 0 & 0 & 1 \end{bmatrix} T_{USD}
Thus, based on their application, cameras follow different conventions for their orientation. This function
converts a quaternion from one convention to another.
Possible conventions are:
- :obj:"opengl" - forward axis: -Z - up axis +Y - Offset is applied in the OpenGL (Usd.Camera) convention
- :obj:"ros" - forward axis: +Z - up axis -Y - Offset is applied in the ROS convention
- :obj:"world" - forward axis: +X - up axis +Z - Offset is applied in the World Frame convention
Args:
orientation torch.Tensor: Quaternion of form `(w, x, y, z)` with shape (..., 4) in source convention
origin (Literal["opengl", "ros", "world"], optional): Convention to convert to. Defaults to "ros".
target (Literal["opengl", "ros", "world"], optional): Convention to convert from. Defaults to "opengl".
Returns:
torch.Tensor: Quaternion of form `(w, x, y, z)` with shape (..., 4) in target convention
"""
if target == origin:
return orientation
# -- unify input type
if origin == "ros":
# convert from ros to opengl convention
rotm = math_utils.matrix_from_quat(orientation)
rotm[:, :, 2] = -rotm[:, :, 2]
rotm[:, :, 1] = -rotm[:, :, 1]
# convert to opengl convention
quat_gl = math_utils.quat_from_matrix(rotm)
elif origin == "world":
# convert from world (x forward and z up) to opengl convention
rotm = math_utils.matrix_from_quat(orientation)
rotm = torch.matmul(
rotm,
math_utils.matrix_from_euler(
torch.tensor([math.pi / 2, -math.pi / 2, 0], device=orientation.device), "XYZ"
),
)
# convert to isaac-sim convention
quat_gl = math_utils.quat_from_matrix(rotm)
else:
quat_gl = orientation
# -- convert to target convention
if target == "ros":
# convert from opengl to ros convention
rotm = math_utils.matrix_from_quat(quat_gl)
rotm[:, :, 2] = -rotm[:, :, 2]
rotm[:, :, 1] = -rotm[:, :, 1]
return math_utils.quat_from_matrix(rotm)
elif target == "world":
# convert from opengl to world (x forward and z up) convention
rotm = math_utils.matrix_from_quat(quat_gl)
rotm = torch.matmul(
rotm,
math_utils.matrix_from_euler(
torch.tensor([math.pi / 2, -math.pi / 2, 0], device=orientation.device), "XYZ"
).T,
)
return math_utils.quat_from_matrix(rotm)
else:
return quat_gl
......@@ -7,22 +7,44 @@
from __future__ import annotations
import torch
from typing import Sequence
from typing import TYPE_CHECKING, Sequence
import omni.isaac.core.utils.prims as prim_utils
import omni.isaac.core.utils.stage as stage_utils
from omni.isaac.core.prims import RigidPrimView
import omni.physics.tensors.impl.api as physx
from omni.isaac.core.prims import RigidContactView, RigidPrimView
from pxr import PhysxSchema
import omni.isaac.orbit.utils.string as string_utils
from omni.isaac.orbit.markers import VisualizationMarkers
from omni.isaac.orbit.markers.config import CONTACT_SENSOR_MARKER_CFG
from ..sensor_base import SensorBase
from .contact_sensor_cfg import ContactSensorCfg
from .contact_sensor_data import ContactSensorData
if TYPE_CHECKING:
from .contact_sensor_cfg import ContactSensorCfg
class ContactSensor(SensorBase):
"""A contact reporting sensor."""
"""A contact reporting sensor.
The contact sensor reports the normal contact forces on a rigid body in the world frame.
It relies on the `PhysX ContactReporter`_ API to be activated on the rigid bodies.
To enable the contact reporter on a rigid body, please make sure to enable the
:attr:`omni.isaac.orbit.sim.spawner.RigidObjectSpawnerCfg.activate_contact_sensors` on your
asset spawner configuration. This will enable the contact reporter on all the rigid bodies
in the asset.
The sensor can be configured to report the contact forces on a set of bodies with a given
filter pattern. Please check the documentation on `RigidContactView`_ for more details.
.. _PhysX ContactReporter: https://docs.omniverse.nvidia.com/kit/docs/omni_usd_schema_physics/104.2/class_physx_schema_physx_contact_report_a_p_i.html
.. _RigidContactView: https://docs.omniverse.nvidia.com/py/isaacsim/source/extensions/omni.isaac.core/docs/index.html#omni.isaac.core.prims.RigidContactView
"""
cfg: ContactSensorCfg
"""The configuration parameters."""
def __init__(self, cfg: ContactSensorCfg):
"""Initializes the contact sensor object.
......@@ -30,11 +52,6 @@ class ContactSensor(SensorBase):
Args:
cfg (ContactSensorCfg): The configuration parameters.
"""
# store config
self.cfg = cfg
# check that config is valid
if self.cfg.history_length < 0:
raise ValueError("History length must be greater than 0.")
# initialize base class
super().__init__(cfg)
# Create empty variables for storing output data
......@@ -45,11 +62,11 @@ class ContactSensor(SensorBase):
def __str__(self) -> str:
"""Returns: A string containing information about the instance."""
return (
f"Contact sensor @ '{self._view._regex_prim_paths}': \n"
f"Contact sensor @ '{self.cfg.prim_path}': \n"
f"\tview type : {self._view.__class__}\n"
f"\tupdate period (s) : {self.cfg.update_period}\n"
f"\tnumber of sensors: {self.count}\n"
f"\tnumber of bodies : {self.num_bodies}"
f"\tnumber of bodies : {self.num_bodies}\n"
f"\tbody names : {self.body_names}\n"
)
"""
......@@ -57,24 +74,117 @@ class ContactSensor(SensorBase):
"""
@property
def count(self) -> int:
"""Number of prims encapsulated."""
return self._count
def data(self) -> ContactSensorData:
# update sensors if needed
self._update_outdated_buffers()
# return the data
return self._data
@property
def num_bodies(self) -> int:
"""Number of prims encapsulated."""
"""Number of bodies with contact sensors attached."""
return self._num_bodies
@property
def body_names(self) -> list[str]:
"""Ordered names of bodies with contact sensors attached."""
prim_paths = self._view.prim_paths[: self.num_bodies]
return [path.split("/")[-1] for path in prim_paths]
@property
def body_view(self) -> RigidPrimView:
"""View for the rigid bodies captured (Isaac Sim)."""
return self._view
@property
def contact_view(self) -> RigidContactView:
"""Contact reporter view for the bodies (Isaac Sim)."""
return self._view._contact_view # pyright: ignore [reportPrivateUsage]
@property
def body_physx_view(self) -> physx.RigidBodyView:
"""View for the rigid bodies captured (PhysX).
Note:
Use this view with caution! It requires handling of tensors in a specific way and is exposed for
advanced users who have a deep understanding of PhysX SDK. Prefer using the Isaac Sim view when possible.
"""
return self._view._physics_view # pyright: ignore [reportPrivateUsage]
@property
def contact_physx_view(self) -> physx.RigidContactView:
"""Contact reporter view for the bodies (PhysX).
Note:
Use this view with caution! It requires handling of tensors in a specific way and is exposed for
advanced users who have a deep understanding of PhysX SDK. Prefer using the Isaac Sim view when possible.
"""
return self._view._contact_view._physics_view # pyright: ignore [reportPrivateUsage]
"""
Operations
"""
def initialize(self, env_prim_path: str):
def set_debug_vis(self, debug_vis: bool):
super().set_debug_vis(debug_vis)
if self.contact_visualizer is not None:
self.contact_visualizer.set_visibility(debug_vis)
def reset(self, env_ids: Sequence[int] | None = None):
# reset the timers and counters
super().reset(env_ids)
# resolve None
if env_ids is None:
env_ids = ...
# reset accumulative data buffers
self._data.current_air_time[env_ids].zero_()
self._data.last_air_time[env_ids].zero_()
self._data.net_forces_w[env_ids].zero_()
# reset the data history
self._data.net_forces_w_history[env_ids].zero_()
# Set all reset sensors to not outdated since their value won't be updated till next sim step.
self._is_outdated[env_ids] = False
def find_bodies(self, name_keys: str | Sequence[str]) -> tuple[list[int], list[str]]:
"""Find bodies in the articulation based on the name keys.
Args:
name_keys (Union[str, Sequence[str]]): A regular expression or a list of regular expressions
to match the body names.
Returns:
Tuple[List[int], List[str]]: A tuple of lists containing the body indices and names.
"""
return string_utils.resolve_matching_names(name_keys, self.body_names)
"""
Implementation.
"""
def _initialize_impl(self):
super()._initialize_impl()
# check that only rigid bodies are selected
matching_prim_paths = prim_utils.find_matching_prim_paths(self.cfg.prim_path)
num_prim_matches = len(matching_prim_paths) // self._num_envs
body_names = list()
for prim_path in matching_prim_paths[:num_prim_matches]:
prim = prim_utils.get_prim_at_path(prim_path)
# check if prim has contact reporter API
if prim.HasAPI(PhysxSchema.PhysxContactReportAPI):
body_names.append(prim_path.rsplit("/", 1)[-1])
# check that there is at least one body with contact reporter API
if not body_names:
raise RuntimeError(
f"Sensor at path '{self.cfg.prim_path}' could not find any bodies with contact reporter API."
"\nHINT: Make sure to enable 'activate_contact_sensors' in the corresponding asset spawn configuration."
)
# construct regex expression for the body names
body_names_regex = r"(" + "|".join(body_names) + r")"
body_names_regex = f"{self.cfg.prim_path.rsplit('/', 1)[0]}/{body_names_regex}"
# construct a new regex expression
# create a rigid prim view for the sensor
prim_paths_expr = f"{env_prim_path}/{self.cfg.prim_path_expr}"
self._view = RigidPrimView(
prim_paths_expr=prim_paths_expr,
prim_paths_expr=body_names_regex,
reset_xform_properties=False,
track_contact_forces=True,
contact_filter_prim_paths_expr=self.cfg.filter_prim_paths_expr,
......@@ -83,88 +193,56 @@ class ContactSensor(SensorBase):
)
self._view.initialize()
# resolve the true count of bodies
self._count = len(prim_utils.find_matching_prim_paths(f"{env_prim_path}"))
self._num_bodies = self._view.count // self.count
# initialize the base class
super().initialize(env_prim_path)
self._num_bodies = self._view.count // self._num_envs
# check that contact reporter succeeded
if self._num_bodies != len(body_names):
raise RuntimeError(
f"Failed to initialize contact reporter for specified bodies."
f"\n\tInput prim path : {self.cfg.prim_path}"
f"\n\tResolved prim paths: {body_names_regex}"
)
# fill the data buffer
self._data.pos_w = torch.zeros(self.count, self._num_bodies, 3, device=self._device)
self._data.quat_w = torch.zeros(self.count, self._num_bodies, 4, device=self._device)
self._data.last_air_time = torch.zeros(self.count, self._num_bodies, device=self._device)
self._data.current_air_time = torch.zeros(self.count, self._num_bodies, device=self._device)
self._data.net_forces_w = torch.zeros(self.count, self._num_bodies, 3, device=self._device)
self._data.pos_w = torch.zeros(self._num_envs, self._num_bodies, 3, device=self._device)
self._data.quat_w = torch.zeros(self._num_envs, self._num_bodies, 4, device=self._device)
self._data.last_air_time = torch.zeros(self._num_envs, self._num_bodies, device=self._device)
self._data.current_air_time = torch.zeros(self._num_envs, self._num_bodies, device=self._device)
self._data.net_forces_w = torch.zeros(self._num_envs, self._num_bodies, 3, device=self._device)
self._data.net_forces_w_history = torch.zeros(
self.count, self.cfg.history_length + 1, self._num_bodies, 3, device=self._device
self._num_envs, self.cfg.history_length + 1, self._num_bodies, 3, device=self._device
)
# force matrix: (num_sensors, num_bodies, num_shapes, num_filter_shapes, 3)
if len(self.cfg.filter_prim_paths_expr) != 0:
num_shapes = self._view._contact_view.num_shapes // self._num_bodies
num_filters = self._view._contact_view.num_filters
num_shapes = self.contact_physx_view.sensor_count // self._num_bodies
num_filters = self.contact_physx_view.filter_count
self._data.force_matrix_w = torch.zeros(
self.count, self._num_bodies, num_shapes, num_filters, 3, device=self._device
)
def reset_buffers(self, env_ids: Sequence[int] | None = None):
"""Resets the sensor internals.
Args:
env_ids (Sequence[int], optional): The sensor ids to reset. Defaults to None.
"""
# reset the timers and counters
super().reset_buffers(env_ids)
# resolve None
if env_ids is None:
env_ids = ...
# reset accumulative data buffers
self._data.current_air_time[env_ids] = 0.0
self._data.last_air_time[env_ids] = 0.0
self._data.net_forces_w[env_ids] = 0.0
# reset the data history
self._data.net_forces_w_history[env_ids] = 0.0
# Set all reset sensors to not outdated since their value won't be updated till next sim step.
self._is_outdated[env_ids] = False
def debug_vis(self):
# visualize the contacts
if self.cfg.debug_vis:
if self.contact_visualizer is None:
prim_path = stage_utils.get_next_free_path("/Visuals/ContactSensor")
self.contact_visualizer = VisualizationMarkers(prim_path, cfg=CONTACT_SENSOR_MARKER_CFG)
# marker indices
# 0: contact, 1: no contact
net_contact_force_w = torch.norm(self._data.net_forces_w, dim=-1)
marker_indices = torch.where(net_contact_force_w > 1.0, 0, 1)
# check if prim is visualized
self.contact_visualizer.visualize(self._data.pos_w.view(-1, 3), marker_indices=marker_indices.view(-1))
"""
Implementation.
"""
def _update_buffers(self, env_ids: Sequence[int]):
def _update_buffers_impl(self, env_ids: Sequence[int]):
"""Fills the buffers of the sensor data."""
# default to all sensors
if len(env_ids) == self.count:
if len(env_ids) == self._num_envs:
env_ids = ...
# obtain the poses of the sensors TODO decide if we really to track poses
pose = self._view._physics_view.get_transforms()
# obtain the poses of the sensors:
# TODO decide if we really to track poses -- This is the body's CoM. Not contact location.
pose = self.body_physx_view.get_transforms()
self._data.pos_w[env_ids] = pose.view(-1, self._num_bodies, 7)[env_ids, :, :3]
self._data.quat_w[env_ids] = pose.view(-1, self._num_bodies, 7)[env_ids, :, 3:]
# obtain the contact forces
# TODO: We are handling the indexing ourself because of the shape; (N, B) vs expected (N * B).
# This isn't the most efficient way to do this, but it's the easiest to implement.
net_forces_w = self._view._contact_view._physics_view.get_net_contact_forces(dt=self._sim_physics_dt)
net_forces_w = self.contact_physx_view.get_net_contact_forces(dt=self._sim_physics_dt)
self._data.net_forces_w[env_ids, :, :] = net_forces_w.view(-1, self._num_bodies, 3)[env_ids]
# obtain the contact force matrix
if len(self.cfg.filter_prim_paths_expr) != 0:
num_shapes = self._view._contact_view.num_shapes // self._num_bodies
num_filters = self._view._contact_view.num_filters
force_matrix_w = self._view.get_contact_force_matrix(dt=self._sim_physics_dt, clone=False)
# shape of the filtering matrix: (num_sensors, num_bodies, num_shapes, num_filter_shapes, 3)
num_shapes = self.contact_physx_view.sensor_count // self._num_bodies
num_filters = self.contact_physx_view.filter_count
# acquire and shape the force matrix
force_matrix_w = self.contact_physx_view.get_contact_force_matrix(dt=self._sim_physics_dt)
force_matrix_w = force_matrix_w.view(-1, self._num_bodies, num_shapes, num_filters, 3)
self._data.force_matrix_w[env_ids] = force_matrix_w[env_ids]
......@@ -187,3 +265,14 @@ class ContactSensor(SensorBase):
self._data.last_air_time[env_ids] = self._data.current_air_time[env_ids] * is_first_contact
# -- increment timers for bodies that are not in contact
self._data.current_air_time[env_ids] *= ~is_contact
def _debug_vis_impl(self):
# visualize the contacts
if self.contact_visualizer is None:
self.contact_visualizer = VisualizationMarkers("/Visuals/ContactSensor", cfg=CONTACT_SENSOR_MARKER_CFG)
# marker indices
# 0: contact, 1: no contact
net_contact_force_w = torch.norm(self._data.net_forces_w, dim=-1)
marker_indices = torch.where(net_contact_force_w > 1.0, 0, 1)
# check if prim is visualized
self.contact_visualizer.visualize(self._data.pos_w.view(-1, 3), marker_indices=marker_indices.view(-1))
......@@ -3,21 +3,20 @@
#
# SPDX-License-Identifier: BSD-3-Clause
"""Configuration for the contact sensor."""
from __future__ import annotations
from omni.isaac.orbit.utils import configclass
from ..sensor_base_cfg import SensorBaseCfg
from .contact_sensor import ContactSensor
@configclass
class ContactSensorCfg(SensorBaseCfg):
"""Configuration for the contact sensor."""
cls_name = "ContactSensor"
cls_name = ContactSensor
filter_prim_paths_expr: list[str] = list()
"""The list of primitive paths to filter contacts with.
......@@ -29,7 +28,3 @@ class ContactSensorCfg(SensorBaseCfg):
If an empty list is provided, then only net contact forces are reported.
"""
history_length: int = 0
"""Number of past frames to store in the sensor buffers. Defaults to 0, which means that only
the current data is stored."""
......@@ -8,18 +8,16 @@ Ray-caster based on warp.
"""
from .patterns_cfg import BpearlPatternCfg, GridPatternCfg, PatternBaseCfg, PinholeCameraPatternCfg
from . import patterns
from .ray_caster import RayCaster
from .ray_caster_cfg import RayCasterCfg
from .ray_caster_data import RayCasterData
__all__ = [
# sensor
"RayCaster",
"RayCasterData",
"RayCasterCfg",
# patterns
"PatternBaseCfg",
"GridPatternCfg",
"PinholeCameraPatternCfg",
"BpearlPatternCfg",
"patterns",
]
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Utility functions for different ray-casting patterns that are used by the ray-caster.
"""
from .patterns import bpearl_pattern, grid_pattern, pinhole_camera_pattern
from .patterns_cfg import BpearlPatternCfg, GridPatternCfg, PatternBaseCfg, PinholeCameraPatternCfg
__all__ = [
"PatternBaseCfg",
# grid pattern
"GridPatternCfg",
"grid_pattern",
# pinhole camera pattern
"PinholeCameraPatternCfg",
"pinhole_camera_pattern",
# bpearl pattern
"BpearlPatternCfg",
"bpearl_pattern",
]
......@@ -11,10 +11,10 @@ import torch
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .patterns_cfg import BpearlPatternCfg, GridPatternCfg, PinholeCameraPatternCfg
from . import patterns_cfg
def grid_pattern(cfg: GridPatternCfg, device: str) -> tuple[torch.Tensor, torch.Tensor]:
def grid_pattern(cfg: patterns_cfg.GridPatternCfg, device: str) -> tuple[torch.Tensor, torch.Tensor]:
"""A regular grid pattern for ray casting.
The grid pattern is made from rays that are parallel to each other. They span a 2D grid in the sensor's
......@@ -41,7 +41,7 @@ def grid_pattern(cfg: GridPatternCfg, device: str) -> tuple[torch.Tensor, torch.
return ray_starts, ray_directions
def pinhole_camera_pattern(cfg: PinholeCameraPatternCfg, device: str) -> tuple[torch.Tensor, torch.Tensor]:
def pinhole_camera_pattern(cfg: patterns_cfg.PinholeCameraPatternCfg, device: str) -> tuple[torch.Tensor, torch.Tensor]:
"""The depth-image pattern for ray casting.
Args:
......@@ -64,7 +64,7 @@ def pinhole_camera_pattern(cfg: PinholeCameraPatternCfg, device: str) -> tuple[t
return ray_starts, ray_directions
def bpearl_pattern(cfg: BpearlPatternCfg, device: str) -> tuple[torch.Tensor, torch.Tensor]:
def bpearl_pattern(cfg: patterns_cfg.BpearlPatternCfg, device: str) -> tuple[torch.Tensor, torch.Tensor]:
"""The RS-Bpearl pattern for ray casting.
The `Robosense RS-Bpearl`_ is a short-range LiDAR that has a 360 degrees x 90 degrees super wide
......
......@@ -8,24 +8,26 @@ from __future__ import annotations
import numpy as np
import torch
from typing import Sequence
from typing import TYPE_CHECKING, Sequence
import carb
import omni.isaac.core.utils.prims as prim_utils
import omni.isaac.core.utils.stage as stage_utils
from omni.isaac.core.articulations import ArticulationView
from omni.isaac.core.prims import RigidPrimView, XFormPrimView
from pxr import UsdGeom, UsdPhysics
from omni.isaac.orbit.markers import VisualizationMarkers
from omni.isaac.orbit.markers.config import RAY_CASTER_MARKER_CFG
from omni.isaac.orbit.terrains.trimesh.utils import make_plane
from omni.isaac.orbit.utils.math import quat_apply, quat_apply_yaw
from omni.isaac.orbit.utils.warp import convert_to_warp_mesh, raycast_mesh
from ..sensor_base import SensorBase
from .ray_caster_cfg import RayCasterCfg
from .ray_caster_data import RayCasterData
if TYPE_CHECKING:
from .ray_caster_cfg import RayCasterCfg
class RayCaster(SensorBase):
"""A ray-casting sensor.
......@@ -43,14 +45,15 @@ class RayCaster(SensorBase):
is a work in progress.
"""
cfg: RayCasterCfg
"""The configuration parameters."""
def __init__(self, cfg: RayCasterCfg):
"""Initializes the ray-caster object.
Args:
cfg (RayCasterCfg): The configuration parameters.
"""
# store config
self.cfg = cfg
# initialize base class
super().__init__(cfg)
# Create empty variables for storing output data
......@@ -63,26 +66,46 @@ class RayCaster(SensorBase):
def __str__(self) -> str:
"""Returns: A string containing information about the instance."""
return (
f"Ray-caster @ '{self._view._regex_prim_paths}': \n"
f"Ray-caster @ '{self.cfg.prim_path}': \n"
f"\tview type : {self._view.__class__}\n"
f"\tupdate period (s) : {self.cfg.update_period}\n"
f"\tnumber of meshes : {len(self.warp_meshes)}\n"
f"\tnumber of sensors: {self._view.count}\n"
f"\tnumber of rays/sensor : {self.num_rays}\n"
f"\tnumber of sensors : {self._view.count}\n"
f"\tnumber of rays/sensor: {self.num_rays}\n"
f"\ttotal number of rays : {self.num_rays * self._view.count}"
)
"""
Operations
Properties
"""
@property
def data(self) -> RayCasterData:
# update sensors if needed
self._update_outdated_buffers()
# return the data
return self._data
"""
Operations.
"""
def initialize(self, env_prim_path: str):
# check if the prim at path is a articulated or rigid prim
def set_debug_vis(self, debug_vis: bool):
super().set_debug_vis(debug_vis)
if self.ray_visualizer is not None:
self.ray_visualizer.set_visibility(debug_vis)
"""
Implementation.
"""
def _initialize_impl(self):
super()._initialize_impl()
# check if the prim at path is an articulated or rigid prim
# we do this since for physics-based view classes we can access their data directly
# otherwise we need to use the xform view class which is slower
prim_view_class = None
prim_paths_expr = f"{env_prim_path}/{self.cfg.prim_path_expr}"
for prim_path in prim_utils.find_matching_prim_paths(prim_paths_expr):
for prim_path in prim_utils.find_matching_prim_paths(self.cfg.prim_path):
# get prim at path
prim = prim_utils.get_prim_at_path(prim_path)
# check if it is a rigid prim
......@@ -92,17 +115,15 @@ class RayCaster(SensorBase):
prim_view_class = RigidPrimView
else:
prim_view_class = XFormPrimView
carb.log_warn(f"The prim at path {prim_path} is not a rigid prim! Using XFormPrimView.")
carb.log_warn(f"The prim at path {prim_path} is not a physics prim! Using XFormPrimView.")
# break the loop
break
# check if prim view class is found
if prim_view_class is None:
raise RuntimeError(f"Failed to find a valid prim view class for the prim paths: {prim_paths_expr}")
raise RuntimeError(f"Failed to find a valid prim view class for the prim paths: {self.cfg.prim_path}")
# create a rigid prim view for the sensor
self._view = prim_view_class(prim_paths_expr, reset_xform_properties=False)
self._view = prim_view_class(self.cfg.prim_path, reset_xform_properties=False)
self._view.initialize()
# initialize the base class
super().initialize(env_prim_path)
# check number of mesh prims provided
if len(self.cfg.mesh_prim_paths) != 1:
......@@ -111,6 +132,13 @@ class RayCaster(SensorBase):
)
# read prims to ray-cast
for mesh_prim_path in self.cfg.mesh_prim_paths:
# check if the prim is a plane - handle PhysX plane as a special case
# if a plane exists then we need to create an infinite mesh that is a plane
mesh_prim = prim_utils.get_first_matching_child_prim(
mesh_prim_path, lambda p: prim_utils.get_prim_type_name(p) == "Plane"
)
# if we did not find a plane then we need to read the mesh
if mesh_prim is None:
# obtain the mesh prim
mesh_prim = prim_utils.get_first_matching_child_prim(
mesh_prim_path, lambda p: prim_utils.get_prim_type_name(p) == "Mesh"
......@@ -124,18 +152,29 @@ class RayCaster(SensorBase):
points = np.asarray(mesh_prim.GetPointsAttr().Get())
indices = np.asarray(mesh_prim.GetFaceVertexIndicesAttr().Get())
wp_mesh = convert_to_warp_mesh(points, indices, device=self.device)
# print info
carb.log_info(
f"Read mesh prim: {mesh_prim.GetPath()} with {len(points)} vertices and {len(indices)} faces."
)
else:
mesh = make_plane(size=(2e6, 2e6), height=0.0, center_zero=True)
wp_mesh = convert_to_warp_mesh(mesh.vertices, mesh.faces, device=self.device)
# print info
carb.log_info(f"Created infinite plane mesh prim: {mesh_prim.GetPath()}.")
# add the warp mesh to the list
self.warp_meshes.append(wp_mesh)
# throw an error if no meshes are found
if len(self.warp_meshes) == 0:
raise RuntimeError("No meshes found for ray-casting! Please check the mesh prim paths.")
raise RuntimeError(
f"No meshes found for ray-casting! Please check the mesh prim paths: {self.cfg.mesh_prim_paths}"
)
# compute ray stars and directions
self.ray_starts, self.ray_directions = self.cfg.pattern_cfg.func(self.cfg.pattern_cfg, self._device)
self.num_rays = len(self.ray_directions)
# apply offset transformation to the rays
offset_pos = torch.tensor(list(self.cfg.pos_offset), device=self._device)
offset_quat = torch.tensor(list(self.cfg.quat_offset), device=self._device)
offset_pos = torch.tensor(list(self.cfg.offset.pos), device=self._device)
offset_quat = torch.tensor(list(self.cfg.offset.rot), device=self._device)
self.ray_directions = quat_apply(offset_quat.repeat(len(self.ray_directions), 1), self.ray_directions)
self.ray_starts += offset_pos
# repeat the rays for each sensor
......@@ -147,24 +186,8 @@ class RayCaster(SensorBase):
self._data.quat_w = torch.zeros(self._view.count, 4, device=self._device)
self._data.ray_hits_w = torch.zeros(self._view.count, self.num_rays, 3, device=self._device)
def debug_vis(self):
# visualize the point hits
if self.cfg.debug_vis:
if self.ray_visualizer is None:
prim_path = stage_utils.get_next_free_path("/Visuals/RayCaster")
self.ray_visualizer = VisualizationMarkers(prim_path, cfg=RAY_CASTER_MARKER_CFG)
# check if prim is visualized
self.ray_visualizer.visualize(self._data.ray_hits_w.view(-1, 3))
"""
Implementation.
"""
def _update_buffers(self, env_ids: Sequence[int] | None = None):
def _update_buffers_impl(self, env_ids: Sequence[int]):
"""Fills the buffers of the sensor data."""
# default to all sensors
if env_ids is None:
env_ids = self._ALL_INDICES
# obtain the poses of the sensors
pos_w, quat_w = self._view.get_world_poses(env_ids, clone=False)
self._data.pos_w[env_ids] = pos_w
......@@ -184,3 +207,10 @@ class RayCaster(SensorBase):
# ray cast and store the hits
# TODO: Make this work for multiple meshes?
self._data.ray_hits_w[env_ids] = raycast_mesh(ray_starts_w, ray_directions_w, self.warp_meshes[0])
def _debug_vis_impl(self):
# visualize the point hits
if self.ray_visualizer is None:
self.ray_visualizer = VisualizationMarkers("/Visuals/RayCaster", cfg=RAY_CASTER_MARKER_CFG)
# check if prim is visualized
self.ray_visualizer.visualize(self._data.ray_hits_w.view(-1, 3))
......@@ -13,14 +13,24 @@ from dataclasses import MISSING
from omni.isaac.orbit.utils import configclass
from ..sensor_base_cfg import SensorBaseCfg
from .patterns_cfg import PatternBaseCfg
from .patterns.patterns_cfg import PatternBaseCfg
from .ray_caster import RayCaster
@configclass
class RayCasterCfg(SensorBaseCfg):
"""Configuration for the ray-cast sensor."""
cls_name = "RayCaster"
@configclass
class OffsetCfg:
"""The offset pose of the sensor's frame from the sensor's parent frame."""
pos: tuple[float, float, float] = (0.0, 0.0, 0.0)
"""Translation w.r.t. the parent frame. Defaults to (0.0, 0.0, 0.0)."""
rot: tuple[float, float, float, float] = (1.0, 0.0, 0.0, 0.0)
"""Quaternion rotation ``(w, x, y, z)`` w.r.t. the parent frame. Defaults to (1.0, 0.0, 0.0, 0.0)."""
cls_name = RayCaster
mesh_prim_paths: list[str] = MISSING
"""The list of mesh primitive paths to ray cast against.
......@@ -30,11 +40,8 @@ class RayCasterCfg(SensorBaseCfg):
static meshes and dynamic meshes.
"""
pos_offset: tuple[float, float, float] = (0.0, 0.0, 0.0)
"""The position offset from the frame the sensor is attached to. Defaults to (0.0, 0.0, 0.0)."""
quat_offset: tuple[float, float, float, float] = (1.0, 0.0, 0.0, 0.0)
"""The quaternion offset (w, x, y, z) from the frame the sensor is attached to. Defaults to (1.0, 0.0, 0.0, 0.0)."""
offset: OffsetCfg = OffsetCfg()
"""The offset pose of the sensor's frame from the sensor's parent frame. Defaults to identity."""
attach_yaw_only: bool = MISSING
"""Whether the rays' starting positions and directions only track the yaw orientation.
......
......@@ -9,122 +9,118 @@ This class defines an interface for sensors similar to how the :class:`omni.isaa
Each sensor class should inherit from this class and implement the abstract methods.
"""
from __future__ import annotations
import torch
from abc import ABC, abstractmethod
from typing import Any, List, Optional, Sequence
from typing import Any, Sequence
import omni.isaac.core.utils.prims as prim_utils
import omni.kit.app
import omni.physx
from omni.isaac.core.simulation_context import SimulationContext
from .sensor_base_cfg import SensorBaseCfg
class SensorBase(ABC):
"""The base class for implementing a sensor."""
"""The base class for implementing a sensor.
The implementation is based on lazy evaluation. The sensor data is only updated when the user
tries accessing the data through the :attr:`data` property or sets ``force_compute=True`` in
the :meth:`update` method. This is done to avoid unnecessary computation when the sensor data
is not used.
The sensor is updated at the specified update period. If the update period is zero, then the
sensor is updated at every simulation step.
"""
def __init__(self, cfg: SensorBaseCfg):
"""Initialize the sensor class.
The sensor tick is the time between two sensor buffers. If the sensor tick is zero, then the sensor
buffers are filled at every simulation step.
Args:
cfg (SensorBaseCfg): The configuration parameters for the sensor.
"""
# Store the config
# check that config is valid
if cfg.history_length < 0:
raise ValueError(f"History length must be greater than 0! Received: {cfg.history_length}")
# store inputs
self.cfg = cfg
# Current timestamp (in seconds)
self._timestamp: torch.Tensor
# Timestamp from last update
self._timestamp_last_update: torch.Tensor
# ids of envs with outdated data
self._is_outdated: torch.Tensor
# Flag for whether the sensor is initialized
self._is_initialized: bool = False
# data object
self._data: object = None
# flag for whether the sensor is initialized
self._is_initialized = False
# add callbacks for stage play/stop
physx_interface = omni.physx.acquire_physx_interface()
self._initialize_handle = physx_interface.get_simulation_event_stream_v2().create_subscription_to_pop_by_type(
int(omni.physx.bindings._physx.SimulationEvent.RESUMED), self._initialize_callback
)
self._invalidate_initialize_handle = (
physx_interface.get_simulation_event_stream_v2().create_subscription_to_pop_by_type(
int(omni.physx.bindings._physx.SimulationEvent.STOPPED), self._invalidate_initialize_callback
)
)
# add callback for debug visualization
if self.cfg.debug_vis:
app_interface = omni.kit.app.get_app_interface()
self._debug_visualization_handle = app_interface.get_post_update_event_stream().create_subscription_to_pop(
self._debug_vis_callback
)
else:
self._debug_visualization_handle = None
def __del__(self):
"""Unsubscribe from the callbacks."""
self._initialize_handle.unsubscribe()
self._invalidate_initialize_handle.unsubscribe()
if self._debug_visualization_handle is not None:
self._debug_visualization_handle.unsubscribe()
"""
Properties
"""
@property
def prim_paths(self) -> List[str]:
"""The paths to the sensor prims."""
return self._view.prim_paths
@property
def count(self) -> int:
"""Number of prims encapsulated."""
return self._view.count
@property
def device(self) -> str:
"""Device where the sensor is running."""
"""Memory device for computation."""
return self._device
@property
def timestamp(self) -> torch.Tensor:
"""Simulation time of the measurement (in seconds)."""
return self._timestamp
@property
@abstractmethod
def data(self) -> Any:
"""Gets the data from the simulated sensor after updating it if necessary."""
"""Data from the sensor.
This property is only updated when the user tries to access the data. This is done to avoid
unnecessary computation when the sensor data is not used.
For updating the sensor when this property is accessed, you can use the following
code snippet in your sensor implementation:
.. code-block:: python
# update sensors if needed
outdated_env_ids = self._is_outdated.nonzero().squeeze(-1)
if len(outdated_env_ids) > 0:
# obtain new data
self._update_buffers(outdated_env_ids)
# update the timestamp from last update
self._timestamp_last_update[outdated_env_ids] = self._timestamp[outdated_env_ids]
# set outdated flag to false for the updated sensors
# TODO (from mayank): Why all of them are false? It is unclear. Shouldn't just be the ones that were updated?
self._is_outdated[:] = False
self._update_outdated_buffers()
# return the data (where `_data` is the data for the sensor)
return self._data
"""
raise NotImplementedError
"""
Operations
"""
def spawn(self, prim_path: str, *args, **kwargs):
"""Spawns the sensor into the stage.
Args:
prim_path (str): The path of the prim to attach the sensor to.
"""
pass
def initialize(self, env_prim_path: str = None):
"""Initializes the sensor handles and internal buffers.
def set_debug_vis(self, debug_vis: bool):
"""Sets whether to visualize the sensor data.
Args:
env_prim_path (str, optional): The (templated) prim path expression to the environments where the sensors are created. Defaults to None.
debug_vis (bool): Whether to visualize the sensor data.
Raises:
RuntimeError: If the simulation context is not initialized.
RuntimeError: If no prims are found for the given prim path expression.
RuntimeError: If the asset debug visualization is not enabled.
"""
# Obtain Simulation Context
sim = SimulationContext.instance()
if sim is not None:
self._device = sim.device
self._sim_physics_dt = sim.get_physics_dt()
else:
raise RuntimeError("Simulation Context is not initialized!")
# Count number of environments
self._num_envs = len(prim_utils.find_matching_prim_paths(env_prim_path))
# Boolean tensor indicating whether the sensor data has to be refreshed
self._is_outdated = torch.ones(self._num_envs, dtype=torch.bool, device=self._device)
# Current timestamp (in seconds)
self._timestamp = torch.zeros(self._num_envs, device=self._device)
# Timestamp from last update
self._timestamp_last_update = torch.zeros_like(self._timestamp)
# Set the initialized flag
self._is_initialized = True
if not self.cfg.debug_vis:
raise RuntimeError("Debug visualization is not enabled for this sensor.")
def reset_buffers(self, env_ids: Optional[Sequence[int]] = None):
def reset(self, env_ids: Sequence[int] | None = None):
"""Resets the sensor internals.
Args:
......@@ -147,11 +143,45 @@ class SensorBase(ABC):
# TODO (from @mayank): Why is there a history length here when it doesn't mean anything in the sensor base?!?
# It is only for the contact sensor but there we should redefine the update function IMO.
if force_recompute or (self.cfg.history_length > 0):
# TODO (from @mayank): Why are we calling an attribute like this!? We should clean this up
# and make a function.
self.data
self._update_outdated_buffers()
"""
Implementation specific.
"""
@abstractmethod
def _initialize_impl(self):
"""Initializes the sensor-related handles and internal buffers."""
# Obtain Simulation Context
sim = SimulationContext.instance()
if sim is not None:
self._device = sim.device
self._sim_physics_dt = sim.get_physics_dt()
else:
raise RuntimeError("Simulation Context is not initialized!")
# Count number of environments
env_prim_path_expr = self.cfg.prim_path.rsplit("/", 1)[0]
self._num_envs = len(prim_utils.find_matching_prim_paths(env_prim_path_expr))
# Boolean tensor indicating whether the sensor data has to be refreshed
self._is_outdated = torch.ones(self._num_envs, dtype=torch.bool, device=self._device)
# Current timestamp (in seconds)
self._timestamp = torch.zeros(self._num_envs, device=self._device)
# Timestamp from last update
self._timestamp_last_update = torch.zeros_like(self._timestamp)
@abstractmethod
def _update_buffers_impl(self, env_ids: Sequence[int]):
"""Fills the sensor data for provided environment ids.
def debug_vis(self):
This function does not perform any time-based checks and directly fills the data into the
data container.
Args:
env_ids (Sequence[int]): The indices of the sensors that are ready to capture.
"""
raise NotImplementedError
def _debug_vis_impl(self):
"""Visualizes the sensor data.
This is an empty function that can be overridden by the derived class to visualize the sensor data.
......@@ -163,17 +193,39 @@ class SensorBase(ABC):
pass
"""
Implementation specific.
Simulation callbacks.
"""
@abstractmethod
def _update_buffers(self, env_ids: Optional[Sequence[int]] = None):
"""Fills the buffers of the sensor data.
def _initialize_callback(self, event):
"""Initializes the scene elements.
This function does not perform any time-based checks and directly fills the data into the data container.
Note:
PhysX handles are only enabled once the simulator starts playing. Hence, this function needs to be
called whenever the simulator "plays" from a "stop" state.
"""
if not self._is_initialized:
self._initialize_impl()
self._is_initialized = True
def _invalidate_initialize_callback(self, event):
"""Invalidates the scene elements."""
self._is_initialized = False
def _debug_vis_callback(self, event):
"""Visualizes the sensor data."""
self._debug_vis_impl()
Args:
env_ids (Optional[Sequence[int]]): The indices of the sensors that are ready to capture.
If None, then all sensors are ready to capture. Defaults to None.
"""
raise NotImplementedError
Helper functions.
"""
def _update_outdated_buffers(self):
"""Fills the sensor data for the outdated sensors."""
outdated_env_ids = self._is_outdated.nonzero().squeeze(-1)
if len(outdated_env_ids) > 0:
# obtain new data
self._update_buffers_impl(outdated_env_ids)
# update the timestamp from last update
self._timestamp_last_update[outdated_env_ids] = self._timestamp[outdated_env_ids]
# set outdated flag to false for the updated sensors
self._is_outdated[outdated_env_ids] = False
......@@ -4,7 +4,7 @@
# SPDX-License-Identifier: BSD-3-Clause
from dataclasses import MISSING
from typing import ClassVar, Optional
from typing import ClassVar
from omni.isaac.orbit.utils import configclass
......@@ -13,17 +13,25 @@ from omni.isaac.orbit.utils import configclass
class SensorBaseCfg:
"""Configuration parameters for a sensor."""
cls_name: ClassVar[str] = MISSING
"""Name of the associated sensor class."""
cls_name: ClassVar[type] = MISSING
"""The associated sensor class."""
prim_path_expr: Optional[str] = None
"""Relative path to the prim on which the sensor should be attached. Defaults to None."""
prim_path: str = MISSING
"""Prim path (or expression) to the asset.
.. note::
The expression can contain the environment namespace regex ``{ENV_REGEX_NS}`` which
will be replaced with the environment namespace.
Example: ``{ENV_REGEX_NS}/Robot/sensor`` will be replaced with ``/World/envs/env_.*/Robot/sensor`.
"""
update_period: float = 0.0
"""Update period of the sensor buffers (in seconds). Defaults to 0.0 (update every step)."""
history_length: int = 0
"""Number of past frames to store in the sensor buffers. Defaults to 0 (no history)."""
"""Number of past frames to store in the sensor buffers. Defaults to 0, which means that only
the current data is stored (no history)."""
debug_vis: bool = False
"""Whether to visualize the sensor. Defaults to False."""
......@@ -35,8 +35,10 @@ __all__ = [
"unscale_transform",
# Rotation
"matrix_from_quat",
"matrix_from_euler",
"quat_inv",
"quat_from_euler_xyz",
"quat_from_matrix",
"quat_apply_yaw",
"quat_box_minus",
"yaw_quat",
......@@ -160,14 +162,14 @@ def matrix_from_quat(quaternions: torch.Tensor) -> torch.Tensor:
return o.reshape(quaternions.shape[:-1] + (3, 3))
def convert_quat(quat: Union[torch.tensor, Sequence[float]], to: Optional[str] = "xyzw") -> torch.tensor:
def convert_quat(quat: Union[torch.Tensor, Sequence[float]], to: Optional[str] = "xyzw") -> torch.Tensor:
"""Converts quaternion from one convention to another.
The convention to convert TO is specified as an optional argument. If to == 'xyzw',
then the input is in 'wxyz' format, and vice-versa.
Args:
quat (Union[torch.tensor, Sequence[float]]): Input quaternion of shape (..., 4).
quat (Union[torch.Tensor, Sequence[float]]): Input quaternion of shape (..., 4).
to (Optional[str], optional): Convention to convert quaternion to.. Defaults to "xyzw".
Raises:
......@@ -175,7 +177,7 @@ def convert_quat(quat: Union[torch.tensor, Sequence[float]], to: Optional[str] =
ValueError: Invalid shape of input `quat`, i.e. not (..., 4,).
Returns:
torch.tensor: The converted quaternion in specified convention.
torch.Tensor: The converted quaternion in specified convention.
"""
# convert to torch (sanity check)
if not isinstance(quat, torch.Tensor):
......@@ -241,6 +243,148 @@ def quat_from_euler_xyz(roll: torch.Tensor, pitch: torch.Tensor, yaw: torch.Tens
return torch.stack([qw, qx, qy, qz], dim=-1)
@torch.jit.script
def _sqrt_positive_part(x: torch.Tensor) -> torch.Tensor:
"""
Returns torch.sqrt(torch.max(0, x))
but with a zero subgradient where x is 0.
Reference:
Based on PyTorch3D (https://github.com/facebookresearch/pytorch3d/blob/main/pytorch3d/transforms/rotation_conversions.py#L91-L99)
"""
ret = torch.zeros_like(x)
positive_mask = x > 0
ret[positive_mask] = torch.sqrt(x[positive_mask])
return ret
@torch.jit.script
def quat_from_matrix(matrix: torch.Tensor) -> torch.Tensor:
"""
Convert rotations given as rotation matrices to quaternions.
Args:
matrix: Rotation matrices as tensor of shape (..., 3, 3).
Returns:
quaternions with real part first, as tensor of shape (..., 4).
Reference:
Based on PyTorch3D (https://github.com/facebookresearch/pytorch3d/blob/main/pytorch3d/transforms/rotation_conversions.py#L102-L161)
"""
if matrix.size(-1) != 3 or matrix.size(-2) != 3:
raise ValueError(f"Invalid rotation matrix shape {matrix.shape}.")
batch_dim = matrix.shape[:-2]
m00, m01, m02, m10, m11, m12, m20, m21, m22 = torch.unbind(matrix.reshape(batch_dim + (9,)), dim=-1)
q_abs = _sqrt_positive_part(
torch.stack(
[
1.0 + m00 + m11 + m22,
1.0 + m00 - m11 - m22,
1.0 - m00 + m11 - m22,
1.0 - m00 - m11 + m22,
],
dim=-1,
)
)
# we produce the desired quaternion multiplied by each of r, i, j, k
quat_by_rijk = torch.stack(
[
# pyre-fixme[58]: `**` is not supported for operand types `Tensor` and
# `int`.
torch.stack([q_abs[..., 0] ** 2, m21 - m12, m02 - m20, m10 - m01], dim=-1),
# pyre-fixme[58]: `**` is not supported for operand types `Tensor` and
# `int`.
torch.stack([m21 - m12, q_abs[..., 1] ** 2, m10 + m01, m02 + m20], dim=-1),
# pyre-fixme[58]: `**` is not supported for operand types `Tensor` and
# `int`.
torch.stack([m02 - m20, m10 + m01, q_abs[..., 2] ** 2, m12 + m21], dim=-1),
# pyre-fixme[58]: `**` is not supported for operand types `Tensor` and
# `int`.
torch.stack([m10 - m01, m20 + m02, m21 + m12, q_abs[..., 3] ** 2], dim=-1),
],
dim=-2,
)
# We floor here at 0.1 but the exact level is not important; if q_abs is small,
# the candidate won't be picked.
flr = torch.tensor(0.1).to(dtype=q_abs.dtype, device=q_abs.device)
quat_candidates = quat_by_rijk / (2.0 * q_abs[..., None].max(flr))
# if not for numerical problems, quat_candidates[i] should be same (up to a sign),
# forall i; we pick the best-conditioned one (with the largest denominator)
return quat_candidates[torch.nn.functional.one_hot(q_abs.argmax(dim=-1), num_classes=4) > 0.5, :].reshape(
batch_dim + (4,)
)
def _axis_angle_rotation(axis: str, angle: torch.Tensor) -> torch.Tensor:
"""
Return the rotation matrices for one of the rotations about an axis
of which Euler angles describe, for each value of the angle given.
Args:
axis: Axis label "X" or "Y or "Z".
angle: any shape tensor of Euler angles in radians
Returns:
Rotation matrices as tensor of shape (..., 3, 3).
Reference:
Based on PyTorch3D (https://github.com/facebookresearch/pytorch3d/blob/main/pytorch3d/transforms/rotation_conversions.py#L164-L191)
"""
cos = torch.cos(angle)
sin = torch.sin(angle)
one = torch.ones_like(angle)
zero = torch.zeros_like(angle)
if axis == "X":
R_flat = (one, zero, zero, zero, cos, -sin, zero, sin, cos)
elif axis == "Y":
R_flat = (cos, zero, sin, zero, one, zero, -sin, zero, cos)
elif axis == "Z":
R_flat = (cos, -sin, zero, sin, cos, zero, zero, zero, one)
else:
raise ValueError("letter must be either X, Y or Z.")
return torch.stack(R_flat, -1).reshape(angle.shape + (3, 3))
def matrix_from_euler(euler_angles: torch.Tensor, convention: str) -> torch.Tensor:
"""
Convert rotations given as Euler angles in radians to rotation matrices.
Args:
euler_angles: Euler angles in radians as tensor of shape (..., 3).
convention: Convention string of three uppercase letters from
{"X", "Y", and "Z"}.
Returns:
Rotation matrices as tensor of shape (..., 3, 3).
Reference:
Based on PyTorch3D (https://github.com/facebookresearch/pytorch3d/blob/main/pytorch3d/transforms/rotation_conversions.py#L194-L220)
"""
if euler_angles.dim() == 0 or euler_angles.shape[-1] != 3:
raise ValueError("Invalid input euler angles.")
if len(convention) != 3:
raise ValueError("Convention must have 3 letters.")
if convention[1] in (convention[0], convention[2]):
raise ValueError(f"Invalid convention {convention}.")
for letter in convention:
if letter not in ("X", "Y", "Z"):
raise ValueError(f"Invalid letter {letter} in convention string.")
matrices = [_axis_angle_rotation(c, e) for c, e in zip(convention, torch.unbind(euler_angles, -1))]
# return functools.reduce(torch.matmul, matrices)
return torch.matmul(torch.matmul(matrices[0], matrices[1]), matrices[2])
@torch.jit.script
def euler_xyz_from_quat(quat: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
"""
......@@ -607,7 +751,7 @@ def unproject_depth(depth: torch.Tensor, intrinsics: torch.Tensor) -> torch.Tens
deal with many possible shapes of depth images and intrinsics matrices.
Args:
depth (torch.tensor): The depth measurement. Shape: (H, W) or or (H, W, 1) or (N, H, W) or (N, H, W, 1).
depth (torch.Tensor): The depth measurement. Shape: (H, W) or or (H, W, 1) or (N, H, W) or (N, H, W, 1).
intrinsics (torch.Tensor): A tensor providing camera's calibration matrix. Shape: (3, 3) or (N, 3, 3).
Returns:
......
......@@ -17,8 +17,8 @@ import argparse
from omni.isaac.orbit.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser("Welcome to Orbit: Omniverse Robotics Environments!")
parser.add_argument("--gpu", action="store_true", default=False, help="Use GPU device for camera rendering output.")
parser = argparse.ArgumentParser(description="Camera Sensor Test Script")
parser.add_argument("--gpu", action="store_false", default=False, help="Use GPU device for camera rendering output.")
args_cli = parser.parse_args()
# launch omniverse app
......@@ -27,50 +27,355 @@ simulation_app = app_launcher.app
"""Rest everything follows."""
import copy
import numpy as np
import os
import random
import torch
import scipy.spatial.transform as tf
import shutil
import traceback
import unittest
import carb
import omni.isaac.core.utils.prims as prim_utils
import omni.isaac.debug_draw._debug_draw as omni_debug_draw
import omni.replicator.core as rep
from omni.isaac.core.prims import RigidPrim
from omni.isaac.core.simulation_context import SimulationContext
from omni.isaac.core.utils.viewports import set_camera_view
from pxr import Gf, UsdGeom
from pxr import Gf, Usd, UsdGeom
import omni.isaac.orbit.utils.kit as kit_utils
from omni.isaac.orbit.sensors.camera import Camera, PinholeCameraCfg
from omni.isaac.orbit.sensors.camera import Camera, CameraCfg
from omni.isaac.orbit.sim import PinholeCameraCfg
from omni.isaac.orbit.utils import convert_dict_to_backend
from omni.isaac.orbit.utils.kit import create_ground_plane
from omni.isaac.orbit.utils.math import convert_quat
from omni.isaac.orbit.utils.timer import Timer
"""
Helpers
"""
# sample camera poses
POSITION = [2.5, 2.5, 2.5]
QUAT_ROS = [-0.17591989, 0.33985114, 0.82047325, -0.42470819]
QUAT_OPENGL = [0.33985113, 0.17591988, 0.42470818, 0.82047324]
QUAT_WORLD = [-0.3647052, -0.27984815, -0.1159169, 0.88047623]
class TestCamera(unittest.TestCase):
"""Test for orbit camera sensor"""
"""
Test Setup and Teardown
"""
def setUp(self):
"""Create a blank new stage for each test."""
self.camera_cfg = CameraCfg(
height=24,
width=32,
prim_path="/World/Camera",
update_period=0,
data_types=["distance_to_image_plane"],
spawn=PinholeCameraCfg(
focal_length=24.0, focus_distance=400.0, horizontal_aperture=20.955, clipping_range=(0.1, 1.0e5)
),
colorize=False,
)
# Simulation time-step
self.dt = 0.01
# Load kit helper
self.sim = SimulationContext(
physics_dt=self.dt, rendering_dt=self.dt, backend="torch", device="cuda" if args_cli.gpu else "cpu"
)
# populate scene
self._populate_scene()
def tearDown(self) -> None:
"""Stops simulator after each test."""
# close all the opened viewport from before.
rep.vp_manager.destroy_hydra_textures()
# stop simulation
self.sim.stop()
self.sim.clear()
"""
Tests
"""
def test_camera_init(self) -> None:
"""Test camera initialization."""
# Create camera
camera = Camera(self.camera_cfg)
# Play sim
self.sim.play()
# Check if camera is initialized
self.assertTrue(camera._is_initialized)
# Check if camera prim is set correctly and that it is a camera prim
self.assertTrue(camera._sensor_prims[0].GetPath().pathString == self.camera_cfg.prim_path)
self.assertTrue(isinstance(camera._sensor_prims[0], UsdGeom.Camera))
# Simulate for a few steps
# note: This is a workaround to ensure that the textures are loaded.
# Check "Known Issues" section in the documentation for more details.
for _ in range(5):
self.sim.step()
# Check buffers that exists and have correct shapes
self.assertTrue(camera.data.pos_w.shape == (1, 3))
self.assertTrue(camera.data.quat_w_ros.shape == (1, 4))
self.assertTrue(camera.data.quat_w_world.shape == (1, 4))
self.assertTrue(camera.data.quat_w_opengl.shape == (1, 4))
self.assertTrue(camera.data.intrinsic_matrices.shape == (1, 3, 3))
self.assertTrue(camera.data.image_shape == (self.camera_cfg.height, self.camera_cfg.width))
self.assertTrue(camera.data.info == [{self.camera_cfg.data_types[0]: None}])
# Simulate physics
for _ in range(10):
# perform rendering
self.sim.step()
# update camera
camera.update(self.dt)
# check image data
for im_data in camera.data.output.to_dict().values():
self.assertTrue(im_data.shape == (1, self.camera_cfg.height, self.camera_cfg.width))
def test_camera_resolution(self) -> None:
# Create camera
camera = Camera(self.camera_cfg)
# Play sim
self.sim.play()
# Simulate for a few steps
# note: This is a workaround to ensure that the textures are loaded.
# Check "Known Issues" section in the documentation for more details.
for _ in range(5):
self.sim.step()
camera.update(self.dt)
# access image data and compare shapes
for im_data in camera.data.output.to_dict().values():
self.assertTrue(im_data.shape == (1, self.camera_cfg.height, self.camera_cfg.width))
def test_camera_init_offset(self) -> None:
"""Test camera initialization with offset."""
# define the same offset in all conventions
cam_cfg_offset_ros = copy.copy(self.camera_cfg)
cam_cfg_offset_ros.offset = CameraCfg.OffsetCfg(
pos=POSITION,
rot=QUAT_ROS,
convention="ros",
)
cam_cfg_offset_ros.prim_path = "/World/CameraOffsetRos"
cam_cfg_offset_opengl = copy.copy(self.camera_cfg)
cam_cfg_offset_opengl.offset = CameraCfg.OffsetCfg(
pos=POSITION,
rot=QUAT_OPENGL,
convention="opengl",
)
cam_cfg_offset_opengl.prim_path = "/World/CameraOffsetOpengl"
cam_cfg_offset_world = copy.copy(self.camera_cfg)
cam_cfg_offset_world.offset = CameraCfg.OffsetCfg(
pos=POSITION,
rot=QUAT_WORLD,
convention="world",
)
cam_cfg_offset_world.prim_path = "/World/CameraOffsetWorld"
camera_ros = Camera(cam_cfg_offset_ros)
camera_opengl = Camera(cam_cfg_offset_opengl)
camera_world = Camera(cam_cfg_offset_world)
# play sim
self.sim.play()
# retrieve camera pose
prim_tf_ros = camera_ros._sensor_prims[0].ComputeLocalToWorldTransform(Usd.TimeCode.Default())
prim_tf_opengl = camera_opengl._sensor_prims[0].ComputeLocalToWorldTransform(Usd.TimeCode.Default())
prim_tf_world = camera_world._sensor_prims[0].ComputeLocalToWorldTransform(Usd.TimeCode.Default())
prim_tf_ros = np.transpose(prim_tf_ros)
prim_tf_opengl = np.transpose(prim_tf_opengl)
prim_tf_world = np.transpose(prim_tf_world)
# check that all transforms are set correctly
self.assertTrue(np.allclose(prim_tf_ros[0:3, 3], cam_cfg_offset_ros.offset.pos))
self.assertTrue(np.allclose(prim_tf_opengl[0:3, 3], cam_cfg_offset_opengl.offset.pos))
self.assertTrue(np.allclose(prim_tf_world[0:3, 3], cam_cfg_offset_world.offset.pos))
self.assertTrue(
np.allclose(
convert_quat(tf.Rotation.from_matrix(prim_tf_ros[:3, :3]).as_quat(), "wxyz"),
cam_cfg_offset_opengl.offset.rot,
)
)
self.assertTrue(
np.allclose(
convert_quat(tf.Rotation.from_matrix(prim_tf_opengl[:3, :3]).as_quat(), "wxyz"),
cam_cfg_offset_opengl.offset.rot,
)
)
self.assertTrue(
np.allclose(
convert_quat(tf.Rotation.from_matrix(prim_tf_world[:3, :3]).as_quat(), "wxyz"),
cam_cfg_offset_opengl.offset.rot,
)
)
# check if transform correctly set in output
self.assertTrue(np.allclose(camera_ros.data.pos_w[0], cam_cfg_offset_ros.offset.pos))
self.assertTrue(np.allclose(camera_ros.data.quat_w_ros[0], QUAT_ROS))
self.assertTrue(np.allclose(camera_ros.data.quat_w_opengl[0], QUAT_OPENGL))
self.assertTrue(np.allclose(camera_ros.data.quat_w_world[0], QUAT_WORLD))
def test_multi_camera_init(self) -> None:
"""Test camera initialization with offset."""
# define the same offset in all conventions
cam_cfg_1 = copy.copy(self.camera_cfg)
cam_cfg_1.prim_path = "/World/Camera_1"
cam_cfg_2 = copy.copy(self.camera_cfg)
cam_cfg_2.prim_path = "/World/Camera_2"
cam_1 = Camera(cam_cfg_1)
cam_2 = Camera(cam_cfg_2)
# play sim
self.sim.play()
# Simulate for a few steps
# note: This is a workaround to ensure that the textures are loaded.
# Check "Known Issues" section in the documentation for more details.
for _ in range(5):
self.sim.step()
# Simulate physics
for _ in range(10):
# perform rendering
self.sim.step()
# update camera
cam_1.update(self.dt)
cam_2.update(self.dt)
# check image data
for cam in [cam_1, cam_2]:
for im_data in cam.data.output.to_dict().values():
self.assertTrue(im_data.shape == (1, self.camera_cfg.height, self.camera_cfg.width))
def test_camera_set_world_poses(self) -> None:
"""Test camera function to set specific world pose."""
camera = Camera(self.camera_cfg)
# play sim
self.sim.play()
# set new pose
camera.set_world_poses([POSITION], [QUAT_WORLD], convention="world")
self.assertTrue(np.allclose(camera.data.pos_w, [POSITION]))
self.assertTrue(np.allclose(camera.data.quat_w_world, [QUAT_WORLD]))
def test_camera_set_world_poses_from_view(self) -> None:
"""Test camera function to set specific world pose from view."""
camera = Camera(self.camera_cfg)
# play sim
self.sim.play()
# set new pose
camera.set_world_poses_from_view([POSITION], [[0.0, 0.0, 0.0]])
self.assertTrue(np.allclose(camera.data.pos_w, [POSITION]))
self.assertTrue(np.allclose(camera.data.quat_w_ros, [QUAT_ROS]))
def test_intrinsic_matrix(self) -> None:
"""Checks that the camera's set and retrieve methods work for intrinsic matrix."""
camera_cfg = copy.copy(self.camera_cfg)
camera_cfg.height = 240
camera_cfg.width = 320
camera = Camera(camera_cfg)
# play sim
self.sim.play()
# Desired properties (obtained from realsense camera at 320x240 resolution)
rs_intrinsic_matrix = [229.31640625, 0.0, 164.810546875, 0.0, 229.826171875, 122.1650390625, 0.0, 0.0, 1.0]
rs_intrinsic_matrix = np.array(rs_intrinsic_matrix).reshape(3, 3)
# Set matrix into simulator
camera.set_intrinsic_matrices([rs_intrinsic_matrix])
# Simulate for a few steps
# note: This is a workaround to ensure that the textures are loaded.
# Check "Known Issues" section in the documentation for more details.
for _ in range(5):
self.sim.step()
# Simulate physics
for _ in range(10):
# perform rendering
self.sim.step()
# update camera
camera.update(self.dt)
# Check that matrix is correct
K = camera.data.intrinsic_matrices[0].numpy()
# TODO: This is not correctly setting all values in the matrix since the
# vertical aperture and aperture offsets are not being set correctly
# This is a bug in the simulator.
self.assertAlmostEqual(rs_intrinsic_matrix[0, 0], K[0, 0], 4)
# self.assertAlmostEqual(rs_intrinsic_matrix[1, 1], K[1, 1], 4)
def test_throughput(self):
"""Checks that the single camera gets created properly with a rig."""
# Create directory temp dir to dump the results
file_dir = os.path.dirname(os.path.realpath(__file__))
temp_dir = os.path.join(file_dir, "output", "camera", "throughput")
os.makedirs(temp_dir, exist_ok=True)
# Create replicator writer
rep_writer = rep.BasicWriter(output_dir=temp_dir, frame_padding=3)
# create camera
camera_cfg = copy.copy(self.camera_cfg)
camera_cfg.height = 480
camera_cfg.width = 640
camera = Camera(camera_cfg)
# Play simulator
self.sim.play()
# Set camera pose
camera.set_world_poses_from_view([2.5, 2.5, 2.5], [0.0, 0.0, 0.0])
self.sim.reset()
# Simulate for a few steps
# note: This is a workaround to ensure that the textures are loaded.
# Check "Known Issues" section in the documentation for more details.
for _ in range(5):
self.sim.step()
# Simulate physics
for _ in range(5):
# perform rendering
self.sim.step()
# update camera
with Timer(f"Time taken for updating camera with shape {camera.image_shape}"):
camera.update(self.dt)
# Save images
with Timer(f"Time taken for writing data with shape {camera.image_shape} "):
# Pack data back into replicator format to save them using its writer
rep_output = dict()
camera_data = convert_dict_to_backend(camera.data.output[0].to_dict(), backend="numpy")
for key, data, info in zip(camera_data.keys(), camera_data.values(), camera.data.info[0].values()):
if info is not None:
rep_output[key] = {"data": data, "info": info}
else:
rep_output[key] = data
# Save images
rep_output["trigger_outputs"] = {"on_time": camera.frame[0]}
rep_writer.write(rep_output)
print("----------------------------------------")
# Check image data
for im_data in camera.data.output.values():
self.assertTrue(im_data.shape == (1, camera_cfg.height, camera_cfg.width))
"""
Helper functions.
"""
def design_scene():
@staticmethod
def _populate_scene():
"""Add prims to the scene."""
# Ground-plane
kit_utils.create_ground_plane("/World/defaultGroundPlane")
create_ground_plane("/World/defaultGroundPlane")
# Lights-1
prim_utils.create_prim(
"/World/Light/GreySphere",
"SphereLight",
translation=(4.5, 3.5, 10.0),
attributes={"radius": 2.5, "intensity": 600.0, "color": (0.75, 0.75, 0.75)},
attributes={"radius": 1.0, "intensity": 300.0, "color": (0.75, 0.75, 0.75)},
)
# Lights-2
prim_utils.create_prim(
"/World/Light/WhiteSphere",
"SphereLight",
translation=(-4.5, 3.5, 10.0),
attributes={"radius": 2.5, "intensity": 600.0, "color": (1.0, 1.0, 1.0)},
attributes={"radius": 1.0, "intensity": 300.0, "color": (1.0, 1.0, 1.0)},
)
# Xform to hold objects
prim_utils.create_prim("/World/Objects", "Xform")
# Random objects
random.seed(0)
for i in range(8):
# sample random position
position = np.random.rand(3) - np.asarray([0.05, 0.05, -1.0])
......@@ -94,115 +399,13 @@ def design_scene():
geom_prim.GetDisplayColorAttr().Set([color])
"""
Main
"""
def main():
"""Runs a camera sensor from orbit."""
# Load kit helper
sim = SimulationContext(
physics_dt=0.005, rendering_dt=0.005, backend="torch", device="cuda" if args_cli.gpu else "cpu"
)
# sim = SimulationContext(physics_dt=0.005, rendering_dt=0.005, backend="numpy")
# Set main camera
set_camera_view([2.5, 2.5, 2.5], [0.0, 0.0, 0.0])
# Acquire draw interface
draw_interface = omni_debug_draw.acquire_debug_draw_interface()
# Populate scene
design_scene()
# Setup camera sensor
camera_cfg = PinholeCameraCfg(
update_period=0.0,
height=480,
width=640,
data_types=["rgb", "distance_to_image_plane", "normals", "motion_vectors", "semantic_segmentation"],
usd_params=PinholeCameraCfg.UsdCameraCfg(
focal_length=24.0, focus_distance=400.0, horizontal_aperture=20.955, clipping_range=(0.1, 1.0e5)
),
)
camera = Camera(cfg=camera_cfg)
# Spawn camera
camera.spawn("/World/CameraSensor/Cam_00")
camera.spawn("/World/CameraSensor/Cam_01")
# Create replicator writer
output_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "output", "camera")
rep_writer = rep.BasicWriter(output_dir=output_dir, frame_padding=3)
# Play simulator
sim.play()
# Initialize camera
camera.initialize("/World/CameraSensor/Cam_*")
# Set pose: There are two ways to set the pose of the camera.
# -- Option-1: Set pose using view
eyes = [[2.5, 2.5, 2.5], [-2.5, -2.5, 2.5]]
targets = [[0.0, 0.0, 0.0], [0.0, 0.0, 0.0]]
camera.set_world_poses_from_view(eyes, targets)
# -- Option-2: Set pose using ROS
# position = [[2.5, 2.5, 2.5]]
# orientation = [[-0.17591989, 0.33985114, 0.82047325, -0.42470819]]
# camera.set_world_pose_ros(position, orientation, indices=[0])
# Simulate for a few steps
# note: This is a workaround to ensure that the textures are loaded.
# Check "Known Issues" section in the documentation for more details.
for _ in range(3):
sim.step()
# Simulate physics
for _ in range(100):
# If simulation is stopped, then exit.
if sim.is_stopped():
break
# If simulation is paused, then skip.
if not sim.is_playing():
sim.step(render=app_launcher.RENDER)
continue
# Step simulation
sim.step(render=app_launcher.RENDER)
# Update camera data
camera.update(dt=0.0)
# Print camera info
print(camera)
print("Received shape of rgb image: ", camera.data.output["rgb"].shape)
print("Received shape of depth image: ", camera.data.output["distance_to_image_plane"].shape)
print("-------------------------------")
# Extract camera data
camera_index = 1
# note: BasicWriter only supports saving data in numpy format, so we need to convert the data to numpy.
if sim.backend == "torch":
# tensordict allows easy indexing of tensors in the dictionary
single_cam_data = convert_dict_to_backend(camera.data.output[camera_index], backend="numpy")
else:
# for numpy, we need to manually index the data
single_cam_data = dict()
for key, value in camera.data.output.items():
single_cam_data[key] = value[camera_index]
# Extract the other information
single_cam_info = camera.data.info[camera_index]
# Pack data back into replicator format to save them using its writer
rep_output = dict()
for key, data, info in zip(single_cam_data.keys(), single_cam_data.values(), single_cam_info.values()):
if info is not None:
rep_output[key] = {"data": data, "info": info}
else:
rep_output[key] = data
# Save images
rep_output["trigger_outputs"] = {"on_time": camera.timestamp[camera_index].item()}
rep_writer.write(rep_output)
if __name__ == "__main__":
# Runs the main function
main()
# Close the simulator
try:
unittest.main()
except Exception as err:
carb.log_error(err)
carb.log_error(traceback.format_exc())
raise
finally:
# close sim app
simulation_app.close()
......@@ -19,9 +19,9 @@ import argparse
from omni.isaac.kit import SimulationApp
# add argparse arguments
parser = argparse.ArgumentParser("Welcome to Orbit: Omniverse Robotics Environments!")
parser = argparse.ArgumentParser(description="Contact Sensor Test Script")
parser.add_argument("--headless", action="store_true", default=False, help="Force display off at all times.")
parser.add_argument("--num_robots", type=int, default=128, help="Number of robots to spawn.")
parser.add_argument("--num_robots", type=int, default=64, help="Number of robots to spawn.")
args_cli = parser.parse_args()
# launch omniverse app
......@@ -42,8 +42,8 @@ from omni.isaac.core.utils.carb import set_carb_setting
from omni.isaac.core.utils.viewports import set_camera_view
import omni.isaac.orbit.utils.kit as kit_utils
from omni.isaac.orbit.robots.config.anymal import ANYMAL_C_CFG
from omni.isaac.orbit.robots.legged_robot import LeggedRobot
from omni.isaac.orbit.assets import Articulation
from omni.isaac.orbit.assets.articulation.config.anymal import ANYMAL_C_CFG
from omni.isaac.orbit.sensors.contact_sensor import ContactSensor, ContactSensorCfg
"""
......@@ -97,27 +97,22 @@ def main():
cloner.define_base_env("/World/envs")
# Everything under the namespace "/World/envs/env_0" will be cloned
prim_utils.define_prim("/World/envs/env_0")
# Spawn things into the scene
robot = LeggedRobot(cfg=ANYMAL_C_CFG)
robot.spawn("/World/envs/env_0/Robot")
# Contact sensor
contact_sensor_cfg = ContactSensorCfg(
prim_path_expr="Robot/.*_SHANK", debug_vis=False if args_cli.headless else True
)
contact_sensor = ContactSensor(cfg=contact_sensor_cfg)
# design props
design_scene()
# Clone the scene
num_envs = args_cli.num_robots
cloner.define_base_env("/World/envs")
envs_prim_paths = cloner.generate_paths("/World/envs/env", num_paths=num_envs)
envs_positions = cloner.clone(
source_prim_path="/World/envs/env_0", prim_paths=envs_prim_paths, replicate_physics=True
_ = cloner.clone(source_prim_path="/World/envs/env_0", prim_paths=envs_prim_paths, replicate_physics=True)
# Design props
design_scene()
# Spawn things into the scene
robot_cfg = ANYMAL_C_CFG.replace(prim_path="/World/envs/env_.*/Robot")
robot_cfg.spawn.activate_contact_sensors = True
robot = Articulation(cfg=robot_cfg)
# Contact sensor
contact_sensor_cfg = ContactSensorCfg(
prim_path="/World/envs/env_.*/Robot/.*_SHANK", debug_vis=False if args_cli.headless else True
)
# convert environment positions to torch tensor
envs_positions = torch.tensor(envs_positions, dtype=torch.float, device=sim.device)
contact_sensor = ContactSensor(cfg=contact_sensor_cfg)
# filter collisions within each environment instance
physics_scene_path = sim.get_physics_context().prim_path
cloner.filter_collisions(
......@@ -126,18 +121,12 @@ def main():
# Play the simulator
sim.reset()
# Acquire handles
# Initialize handles
robot.initialize("/World/envs/env_.*/Robot")
contact_sensor.initialize("/World/envs/env_.*")
# print info
print(contact_sensor)
# Now we are ready!
print("[INFO]: Setup complete...")
# dummy actions
actions = robot.data.default_dof_pos
# Define simulation stepping
decimation = 4
sim_dt = decimation * sim.get_physics_dt()
......@@ -158,32 +147,30 @@ def main():
sim_time = 0.0
count = 0
# reset dof state
dof_pos = robot.data.default_dof_pos
dof_vel = robot.data.default_dof_vel
robot.set_dof_state(dof_pos, dof_vel)
robot.reset_buffers()
# reset command
actions = robot.data.default_dof_pos
joint_pos, joint_vel = robot.data.default_joint_pos, robot.data.default_joint_vel
robot.write_joint_state_to_sim(joint_pos, joint_vel)
robot.reset()
# perform 4 steps
for _ in range(decimation):
# apply actions
robot.set_dof_position_targets(actions)
robot.set_joint_position_targets(robot.data.default_joint_pos)
# write commands to sim
robot.write_commands_to_sim()
robot.write_data_to_sim()
# perform step
sim.step(render=not args_cli.headless)
# fetch data
robot.refresh_sim_data(refresh_dofs=True)
robot.fetch_data_from_sim()
# update sim-time
sim_time += sim_dt
count += 1
# update the buffers
if sim.is_playing():
robot.update_buffers(sim_dt)
robot.update(sim_dt)
contact_sensor.update(sim_dt, force_recompute=True)
# update marker visualization
if not args_cli.headless:
contact_sensor.debug_vis()
if count % 100 == 0:
print("Sim-time: ", sim_time)
print("Number of contacts: ", torch.count_nonzero(contact_sensor.data.current_air_time == 0.0).item())
print("-" * 80)
if __name__ == "__main__":
......
......@@ -19,14 +19,14 @@ import argparse
from omni.isaac.kit import SimulationApp
# add argparse arguments
parser = argparse.ArgumentParser("Welcome to Orbit: Omniverse Robotics Environments!")
parser = argparse.ArgumentParser(description="Ray Caster Test Script")
parser.add_argument("--headless", action="store_true", default=False, help="Force display off at all times.")
parser.add_argument("--num_envs", type=int, default=128, help="Number of environments to clone.")
parser.add_argument(
"--terrain_type",
type=str,
default="generated",
help="Type of terrain to import. Can be 'generated' or 'usd' or 'plane'.",
default="generator",
help="Type of terrain to import. Can be 'generator' or 'usd' or 'plane'.",
)
args_cli = parser.parse_args()
......@@ -49,8 +49,7 @@ from omni.isaac.core.simulation_context import SimulationContext
from omni.isaac.core.utils.viewports import set_camera_view
import omni.isaac.orbit.terrains as terrain_gen
import omni.isaac.orbit.utils.kit as kit_utils
from omni.isaac.orbit.sensors.ray_caster import GridPatternCfg, RayCaster, RayCasterCfg
from omni.isaac.orbit.sensors.ray_caster import RayCaster, RayCasterCfg, patterns
from omni.isaac.orbit.terrains.config.rough import ROUGH_TERRAINS_CFG
from omni.isaac.orbit.terrains.terrain_importer import TerrainImporter
from omni.isaac.orbit.utils.assets import ISAAC_NUCLEUS_DIR
......@@ -112,27 +111,21 @@ def main():
# Design the scene
design_scene(sim=sim, num_envs=num_envs)
# Handler for terrains importing
if args_cli.terrain_type == "generated":
terrain_importer_cfg = terrain_gen.TerrainImporterCfg(
prim_path="/World/ground",
terrain_type="generator",
terrain_type=args_cli.terrain_type,
terrain_generator=ROUGH_TERRAINS_CFG,
usd_path=f"{ISAAC_NUCLEUS_DIR}/Environments/Terrains/rough_plane.usd",
max_init_terrain_level=None,
num_envs=1,
)
terrain_importer = TerrainImporter(terrain_importer_cfg, num_envs=1, device=sim.device)
elif args_cli.terrain_type == "usd":
prim_utils.create_prim("/World/ground", usd_path=f"{ISAAC_NUCLEUS_DIR}/Environments/Terrains/rough_plane.usd")
elif args_cli.terrain_type == "plane":
kit_utils.create_ground_plane("/World/ground")
else:
raise NotImplementedError(f"Terrain type {args_cli.terrain_type} not supported!")
terrain_importer = TerrainImporter(terrain_importer_cfg)
# Create a ray-caster sensor
pattern_cfg = GridPatternCfg(resolution=0.1, size=(1.6, 1.0))
ray_caster_cfg = RayCasterCfg(
prim_path_expr="ball",
prim_path="/World/envs/env_.*/ball",
mesh_prim_paths=["/World/ground"],
pattern_cfg=pattern_cfg,
pattern_cfg=patterns.GridPatternCfg(resolution=0.1, size=(1.6, 1.0)),
attach_yaw_only=True,
debug_vis=False if args_cli.headless else True,
)
......@@ -146,8 +139,6 @@ def main():
# Initialize the views
# -- balls
ball_view.initialize()
# -- sensors
ray_caster.initialize("/World/envs/env_.*")
# Print the sensor information
print(ray_caster)
......@@ -176,18 +167,14 @@ def main():
)
ball_view.set_velocities(ball_initial_velocities[reset_indices], indices=reset_indices)
# reset the sensor
ray_caster.reset_buffers(reset_indices)
ray_caster.reset(reset_indices)
# reset the counter
step_count = 0
# Step simulation
sim.step()
# Update the ray-caster
with Timer(f"Ray-caster update with {ray_caster.count} x {ray_caster.num_rays} rays"):
with Timer(f"Ray-caster update with {num_envs} x {ray_caster.num_rays} rays"):
ray_caster.update(dt=sim.get_physics_dt(), force_recompute=True)
# Visualize the ray-caster
if not args_cli.headless:
with Timer(f"Ray-caster debug visualization\t\t"):
ray_caster.debug_vis()
# Update counter
step_count += 1
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment