Unverified Commit 7a76da7f authored by Mayank Mittal's avatar Mayank Mittal Committed by GitHub

Modifies sensors to support sensors views (#42)

# Description

<!--
Thank you for your interest in sending a pull request. Please make sure
to check the contribution guidelines.

Link: https://isaac-orbit.github.io/orbit/source/refs/contributing.html
-->

As a step towards supporting sensors as "Views", this PR upgrades the
`SensorBase` class to follow a similar definition as `RobotBase` and
updates the sensor implementations as well. Now, the following sensors
are supported:

* `Camera`: This spawns a USD camera object and instantiates it with
replicator OmniGraph for different synthetic data
* `RayCaster`: This is a virtual sensor that uses a warp kernel for
raycasting against static meshes.
* `ContactSensor`: This wraps around the `RigidContactView` from PhysX
to provide the contact forces on encapsulated prims.

Note: Currently camera rendering is not truly parallelized. They are
called sequentially inside the `Camera` to fetch their data.
Additionally, some camera data such as bounding boxes are not possible
to get in the torch backend. Due to this, they are not supported in the
new `Camera` class.

Old the old sensor implementations have been moved to the
`omni.isaac.orbit.compat.sensors` module for backwards compatibility. We
will remove them once we have moved to v1.0.

Fixes #3

<!-- As a practice, it is recommended to open an issue to have
discussions on the proposed pull request.
This makes it easier for the community to keep track of what is being
developed or added, and if a given feature
is demanded by more than one party. -->

## Type of change

<!-- As you go through the list, delete the ones that are not
applicable. -->

- New feature (non-breaking change which adds functionality)
- Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- This change requires a documentation update

## Checklist

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./orbit.sh --format`
- [x] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [x] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file

<!--
As you go through the checklist above, you can mark something as done by
putting an x character in it

For example,
- [x] I have done this task
- [ ] I have not done this task
-->
parent 1fb853cc
......@@ -40,6 +40,7 @@
"teleoperation",
"xform",
"numpy",
"tensordict",
"flatcache",
"physx",
"dpad",
......
......@@ -124,6 +124,7 @@ autodoc_mock_imports = [
"hid",
"prettytable",
"tqdm",
"tensordict",
"trimesh",
"toml",
]
......
MIT License
Copyright (c) Meta Platforms, Inc. and affiliates.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
......@@ -23,6 +23,7 @@ omni.isaac.orbit extension
orbit.utils.kit
orbit.utils.math
orbit.utils.mdp
orbit.compat
omni.isaac.orbit_envs extension
-------------------------------
......
omni.isaac.orbit.compat
=======================
.. warning::
This module is deprecated and will be removed in a future release. Please use
the following modules instead:
* :mod:`omni.isaac.orbit.markers`
* :mod:`omni.isaac.orbit.sensors`
Markers
-------
.. automodule:: omni.isaac.orbit.compat.markers
:members:
:undoc-members:
:show-inheritance:
Sensor Base
-----------
.. automodule:: omni.isaac.orbit.compat.sensors.sensor_base
:members:
:undoc-members:
:show-inheritance:
Camera
------
.. autoclass:: omni.isaac.orbit.compat.sensors.camera.camera.Camera
:members:
:undoc-members:
:show-inheritance:
:noindex:
Data
^^^^
.. autoclass:: omni.isaac.orbit.compat.sensors.camera.camera.CameraData
:members:
:undoc-members:
:show-inheritance:
:noindex:
Configuration
^^^^^^^^^^^^^
.. automodule:: omni.isaac.orbit.sensors.camera.camera_cfg
:members:
:undoc-members:
:show-inheritance:
:noindex:
Utilities
^^^^^^^^^
.. automodule:: omni.isaac.orbit.sensors.camera.utils
:members:
:undoc-members:
:show-inheritance:
:noindex:
Height Scanner
--------------
.. autoclass:: omni.isaac.orbit.compat.sensors.height_scanner.height_scanner.HeightScanner
:members:
:undoc-members:
:show-inheritance:
Data
^^^^
.. autoclass:: omni.isaac.orbit.compat.sensors.height_scanner.height_scanner.HeightScannerData
:members:
:undoc-members:
:show-inheritance:
Configuration
^^^^^^^^^^^^^
.. automodule:: omni.isaac.orbit.compat.sensors.height_scanner.height_scanner_cfg
:members:
:undoc-members:
:show-inheritance:
Utilities
^^^^^^^^^
.. automodule:: omni.isaac.orbit.compat.sensors.height_scanner.utils
:members:
:undoc-members:
:show-inheritance:
omni.isaac.orbit.sensors
========================
Sensor Base
-----------
......@@ -40,36 +41,3 @@ Utilities
:members:
:undoc-members:
:show-inheritance:
Height Scanner
--------------
.. autoclass:: omni.isaac.orbit.sensors.height_scanner.height_scanner.HeightScanner
:members:
:undoc-members:
:show-inheritance:
Data
^^^^
.. autoclass:: omni.isaac.orbit.sensors.height_scanner.height_scanner.HeightScannerData
:members:
:undoc-members:
:show-inheritance:
Configuration
^^^^^^^^^^^^^
.. automodule:: omni.isaac.orbit.sensors.height_scanner.height_scanner_cfg
:members:
:undoc-members:
:show-inheritance:
Utilities
^^^^^^^^^
.. automodule:: omni.isaac.orbit.sensors.height_scanner.utils
:members:
:undoc-members:
:show-inheritance:
......@@ -24,6 +24,7 @@ extra_standard_library = [
"h5py",
"open3d",
"torch",
"tensordict",
"bpy",
"matplotlib",
"gym",
......
[package]
# Note: Semantic Versioning is used: https://semver.org/
version = "0.4.4"
version = "0.5.0"
# Description
title = "ORBIT framework for Robot Learning"
......
Changelog
---------
0.4.4 (2023-07-05)
~~~~~~~~~~~~~~~~~~
0.5.0 (2023-07-04)
Added
^^^^^
* Added the :attr:`omni.isaac.orbit.terrains.TerrainGeneratorCfg.seed` to make generation of terrains reproducible.
The default value is ``None`` which means that the seed is not set.
* Added a generalized :class:`omni.isaac.orbit.sensors.SensorBase` class that leverages the ideas of views to
handle multiple sensors in a single class.
* Added the classes :class:`omni.isaac.orbit.sensors.RayCaster`, :class:`omni.isaac.orbit.sensors.ContactSensor`,
and :class:`omni.isaac.orbit.sensors.Camera` that output a batched tensor of sensor data.
Changed
^^^^^^^
* Renamed the parameter ``sensor_tick`` to ``update_freq`` to make it more intuitive.
* Moved the old sensors in :mod:`omni.isaac.orbit.sensors` to :mod:`omni.isaac.orbit.compat.sensors`.
* Modified the standalone scripts to use the :mod:`omni.isaac.orbit.compat.sensors` module.
0.4.4 (2023-07-05)
~~~~~~~~~~~~~~~~~~
Fixed
^^^^^
* Fixed the :meth:`omni.isaac.orbit.terrains.trimesh.utils.make_plane` method to handle the case when the
plane origin does not need to be centered.
* Added the :attr:`omni.isaac.orbit.terrains.TerrainGeneratorCfg.seed` to make generation of terrains reproducible.
The default value is ``None`` which means that the seed is not set.
Changed
^^^^^^^
......
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
This submodule contains code from previous release of Orbit that is kept for compatibility reasons.
.. note::
This module is not part of the public API and may be removed in future releases.
"""
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Camera wrapper around USD camera prim to provide an interface that follows the robotics convention.
"""
from .camera import Camera, CameraData
from .camera_cfg import FisheyeCameraCfg, PinholeCameraCfg
__all__ = ["Camera", "CameraData", "PinholeCameraCfg", "FisheyeCameraCfg"]
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Configuration for the camera sensor."""
from dataclasses import MISSING
from typing import List, Tuple
# omni-isaac-orbit
from omni.isaac.orbit.utils import configclass
__all__ = ["PinholeCameraCfg", "FisheyeCameraCfg"]
@configclass
class PinholeCameraCfg:
"""Configuration for a pinhole camera sensor."""
@configclass
class UsdCameraCfg:
"""USD related configuration of the sensor.
The parameter is kept default from USD if it is set to :obj:`None`. This includes the default
parameters (in case the sensor is created) or the ones set by the user (in case the sensor is
loaded from existing USD stage).
Reference:
* https://docs.omniverse.nvidia.com/prod_extensions/prod_extensions/ext_replicator/annotators_details.html
* https://graphics.pixar.com/usd/docs/api/class_usd_geom_camera.html
"""
clipping_range: Tuple[float, float] = None
"""Near and far clipping distances (in stage units)."""
focal_length: float = None
"""Perspective focal length (in mm). Longer lens lengths narrower FOV, shorter lens lengths wider FOV."""
focus_distance: float = None
"""Distance from the camera to the focus plane (in stage units).
The distance at which perfect sharpness is achieved.
"""
f_stop: float = None
"""Lens aperture. Defaults to 0.0, which turns off focusing.
Controls Distance Blurring. Lower Numbers decrease focus range, larger numbers increase it.
"""
horizontal_aperture: float = None
"""Horizontal aperture (in mm). Emulates sensor/film width on a camera."""
horizontal_aperture_offset: float = None
"""Offsets Resolution/Film gate horizontally."""
vertical_aperture_offset: float = None
"""Offsets Resolution/Film gate vertically."""
sensor_tick: float = 0.0
"""Simulation seconds between sensor buffers. Defaults to 0.0."""
data_types: List[str] = ["rgb"]
"""List of sensor names/types to enable for the camera. Defaults to ["rgb"]."""
width: int = MISSING
"""Width of the image in pixels."""
height: int = MISSING
"""Height of the image in pixels."""
semantic_types: List[str] = ["class"]
"""List of allowed semantic types the types. Defaults to ["class"].
For example, if semantic types is [“class”], only the bounding boxes for prims with semantics of
type “class” will be retrieved.
More information available at:
https://docs.omniverse.nvidia.com/app_code/prod_extensions/ext_replicator/semantic_schema_editor.html
"""
projection_type: str = "pinhole"
"""Type of projection to use for the camera. Defaults to "pinhole"."""
usd_params: UsdCameraCfg = UsdCameraCfg()
"""Parameters for setting USD camera settings."""
@configclass
class FisheyeCameraCfg(PinholeCameraCfg):
"""Configuration for a fisheye camera sensor."""
@configclass
class UsdCameraCfg(PinholeCameraCfg.UsdCameraCfg):
"""USD related configuration of the sensor for the fisheye model."""
fisheye_nominal_width: float = None
"""Nominal width of fisheye lens model."""
fisheye_nominal_height: float = None
"""Nominal height of fisheye lens model."""
fisheye_optical_centre_x: float = None
"""Horizontal optical centre position of fisheye lens model."""
fisheye_optical_centre_y: float = None
"""Vertical optical centre position of fisheye lens model."""
fisheye_max_fov: float = None
"""Maximum field of view of fisheye lens model."""
fisheye_polynomial_a: float = None
"""First component of fisheye polynomial."""
fisheye_polynomial_b: float = None
"""Second component of fisheye polynomial."""
fisheye_polynomial_c: float = None
"""Third component of fisheye polynomial."""
fisheye_polynomial_d: float = None
"""Fourth component of fisheye polynomial."""
fisheye_polynomial_e: float = None
"""Fifth component of fisheye polynomial."""
projection_type: str = "fisheye_polynomial"
"""Type of projection to use for the camera. Defaults to "fisheye_polynomial"."""
usd_params: UsdCameraCfg = UsdCameraCfg()
"""Parameters for setting USD camera settings."""
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Base class for sensors.
This class defines an interface for sensors similar to how the :class:`omni.isaac.orbit.robot.robot_base.RobotBase` class works.
Each sensor class should inherit from this class and implement the abstract methods.
"""
from abc import abstractmethod
from typing import Any
from warnings import warn
import carb
class SensorBase:
"""The base class for implementing a sensor.
Note:
These sensors are not vectorized yet.
"""
def __init__(self, sensor_tick: float = 0.0):
"""Initialize the sensor class.
The sensor tick is the time between two sensor buffers. If the sensor tick is zero, then the sensor
buffers are filled at every simulation step.
Args:
sensor_tick (float, optional): Simulation seconds between sensor buffers. Defaults to 0.0.
"""
# print warning to notify user that the sensor is not vectorized
carb.log_warn("This implementation of the sensor is not vectorized yet. Please use the vectorized version.")
# Copy arguments to class
self._sensor_tick: float = sensor_tick
# Current timestamp of animation (in seconds)
self._timestamp: float = 0.0
# Timestamp from last update
self._timestamp_last_update: float = 0.0
# Frame number when the measurement is taken
self._frame: int = 0
def __init_subclass__(cls, **kwargs):
"""This throws a deprecation warning on subclassing."""
warn(f"{cls.__name__} will be deprecated from v1.0.", DeprecationWarning, stacklevel=1)
super().__init_subclass__(**kwargs)
"""
Properties
"""
@property
def frame(self) -> int:
"""Frame number when the measurement took place."""
return self._frame
@property
def timestamp(self) -> float:
"""Simulation time of the measurement (in seconds)."""
return self._timestamp
@property
def sensor_tick(self) -> float:
"""Simulation seconds between sensor buffers (ticks)."""
return self._sensor_tick
@property
def data(self) -> Any:
"""The data from the simulated sensor."""
return None # noqa: R501
"""
Helpers
"""
def set_visibility(self, visible: bool):
"""Set visibility of the instance in the scene.
Note:
Sensors are mostly XForms which do not have any mesh associated to them. Thus,
overriding this method is optional.
Args:
visible (bool) -- Whether to make instance visible or invisible.
"""
pass
"""
Operations
"""
@abstractmethod
def spawn(self, parent_prim_path: str):
"""Spawns the sensor into the stage.
Args:
parent_prim_path (str): The path of the parent prim to attach sensor to.
"""
raise NotImplementedError
@abstractmethod
def initialize(self):
"""Initializes the sensor handles and internal buffers."""
raise NotImplementedError
def reset(self):
"""Resets the sensor internals."""
# Set current time
self._timestamp = 0.0
self._timestamp_last_update = 0.0
# Set zero captures
self._frame = 0
def update(self, dt: float, *args, **kwargs):
"""Updates the buffers at sensor frequency.
This function performs time-based checks and fills the data into the data container. It
calls the function :meth:`buffer()` to fill the data. The function :meth:`buffer()` should
not be called directly.
Args:
dt (float): The simulation time-step.
args (tuple): Other positional arguments passed to function :meth:`buffer()`.
kwargs (dict): Other keyword arguments passed to function :meth:`buffer()`.
"""
# Get current time
self._timestamp += dt
# Buffer the sensor data.
if (self._timestamp - self._timestamp_last_update) >= self._sensor_tick:
# Buffer the data
self.buffer(*args, **kwargs)
# Update the frame count
self._frame += 1
# Update capture time
self._timestamp_last_update = self._timestamp
@abstractmethod
def buffer(self, *args, **kwargs):
"""Fills the buffers of the sensor data.
This function does not perform any time-based checks and directly fills the data into the data container.
Warning:
Although this method is public, `update(dt)` should be the preferred way of filling buffers.
"""
raise NotImplementedError
......@@ -34,13 +34,31 @@ POSITION_GOAL_MARKER_CFG = VisualizationMarkersCfg(
)
"""Configuration for the end-effector tracking marker."""
HEIGHT_SCAN_MARKER_CFG = VisualizationMarkersCfg(
RAY_CASTER_MARKER_CFG = VisualizationMarkersCfg(
markers={
"hit": VisualizationMarkersCfg.MarkerCfg(
prim_type="Sphere",
color=(1.0, 0.0, 0.0),
attributes={"radius": 0.01},
attributes={"radius": 0.02},
),
},
)
"""Configuration for the ray-caster marker."""
CONTACT_SENSOR_MARKER_CFG = VisualizationMarkersCfg(
markers={
"contact": VisualizationMarkersCfg.MarkerCfg(
prim_type="Sphere",
color=(1.0, 0.0, 0.0),
attributes={"radius": 0.02},
),
"no_contact": VisualizationMarkersCfg.MarkerCfg(
visible=False,
prim_type="Sphere",
color=(0.0, 1.0, 0.0),
attributes={"radius": 0.02},
),
},
)
"""Configuration for the height scan marker."""
"""Configuration for the contact sensor marker."""
......@@ -2,3 +2,16 @@
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
This subpackage contains the sensor classes that are compatible with the Isaac Sim. We include both
USD-based and custom sensors. The USD-based sensors are the ones that are available in Omniverse and
require creating a USD prim for them. Custom sensors, on the other hand, are the ones that are
implemented in Python and do not require creating a USD prim for them.
"""
from .camera import * # noqa: F401, F403
from .contact_sensor import * # noqa: F401, F403
from .ray_caster import * # noqa: F401, F403
from .sensor_base import SensorBase # noqa: F401
......@@ -7,7 +7,8 @@
Camera wrapper around USD camera prim to provide an interface that follows the robotics convention.
"""
from .camera import Camera, CameraData
from .camera import Camera
from .camera_cfg import FisheyeCameraCfg, PinholeCameraCfg
from .camera_data import CameraData
__all__ = ["Camera", "CameraData", "PinholeCameraCfg", "FisheyeCameraCfg"]
......@@ -9,14 +9,15 @@
from dataclasses import MISSING
from typing import List, Tuple
# omni-isaac-orbit
from omni.isaac.orbit.utils import configclass
from ..sensor_base_cfg import SensorBaseCfg
__all__ = ["PinholeCameraCfg", "FisheyeCameraCfg"]
@configclass
class PinholeCameraCfg:
class PinholeCameraCfg(SensorBaseCfg):
"""Configuration for a pinhole camera sensor."""
@configclass
......@@ -53,8 +54,6 @@ class PinholeCameraCfg:
vertical_aperture_offset: float = None
"""Offsets Resolution/Film gate vertically."""
sensor_tick: float = 0.0
"""Simulation seconds between sensor buffers. Defaults to 0.0."""
data_types: List[str] = ["rgb"]
"""List of sensor names/types to enable for the camera. Defaults to ["rgb"]."""
width: int = MISSING
......@@ -70,6 +69,15 @@ class PinholeCameraCfg:
More information available at:
https://docs.omniverse.nvidia.com/app_code/prod_extensions/ext_replicator/semantic_schema_editor.html
"""
colorize: bool = False
"""whether to output colorized semantic information or non-colorized one. Defaults to False.
If True, the semantic images will be a 2D array of RGBA values, where each pixel is colored according to
the semantic type. Accordingly, the information output will contain mapping from color to semantic labels.
If False, the semantic images will be a 2D array of integers, where each pixel is an integer representing
the semantic ID. Accordingly, the information output will contain mapping from semantic ID to semantic labels.
"""
projection_type: str = "pinhole"
"""Type of projection to use for the camera. Defaults to "pinhole"."""
usd_params: UsdCameraCfg = UsdCameraCfg()
......
import numpy as np
from dataclasses import dataclass
from tensordict import TensorDict
from typing import Any, Dict, List, Tuple, Union
from omni.isaac.orbit.utils.array import TensorData
@dataclass
class CameraData:
"""Data container for the camera sensor."""
position: TensorData = None
"""Position of the sensor origin in world frame, following ROS convention.
Shape is (N, 3) where ``N`` is the number of sensors.
"""
orientation: TensorData = None
"""Quaternion orientation `(w, x, y, z)` of the sensor origin in world frame, following ROS convention.
Shape: (N, 4) where ``N`` is the number of sensors.
"""
intrinsic_matrices: TensorData = None
"""The intrinsic matrices for the camera.
Shape is (N, 3, 3) where ``N`` is the number of sensors.
"""
image_shape: Tuple[int, int] = None
"""A tuple containing (height, width) of the camera sensor."""
output: Union[Dict[str, np.ndarray], TensorDict] = None
"""The retrieved sensor data with sensor types as key.
The format of the data is available in the `Replicator Documentation`_. For semantic-based data,
this corresponds to the ``"data"`` key in the output of the sensor.
.. _Replicator Documentation: https://docs.omniverse.nvidia.com/prod_extensions/prod_extensions/ext_replicator/annotators_details.html#annotator-output
"""
info: List[Dict[str, Any]] = None
"""The retrieved sensor info with sensor types as key.
This contains extra information provided by the sensor such as semantic segmentation label mapping, prim paths.
For semantic-based data, this corresponds to the ``"info"`` key in the output of the sensor. For other sensor
types, the info is empty.
"""
......@@ -11,8 +11,8 @@ from typing import Optional, Sequence, Tuple, Union
import warp as wp
from omni.isaac.orbit.utils.array import convert_to_torch
from omni.isaac.orbit.utils.math import matrix_from_quat
import omni.isaac.orbit.utils.math as math_utils
from omni.isaac.orbit.utils.array import TensorData, convert_to_torch
__all__ = ["transform_points", "create_pointcloud_from_depth", "create_pointcloud_from_rgbd"]
......@@ -23,15 +23,15 @@ Depth <-> Pointcloud conversions.
def transform_points(
points: Union[np.ndarray, torch.Tensor, wp.array],
points: TensorData,
position: Optional[Sequence[float]] = None,
orientation: Optional[Sequence[float]] = None,
device: Union[torch.device, str, None] = None,
) -> Union[np.ndarray, torch.Tensor]:
r"""Transform input points in a given frame to a target frame.
This function uses torch operations to transform points from a source frame to a target frame. The
transformation is defined by the position ``t`` and orientation ``R`` of the target frame in the source frame.
This function transform points from a source frame to a target frame. The transformation is defined by the
position ``t`` and orientation ``R`` of the target frame in the source frame.
.. math::
p_{target} = R_{target} \times p_{source} + t_{target}
......@@ -39,7 +39,7 @@ def transform_points(
If either the inputs `position` and `orientation` are :obj:`None`, the corresponding transformation is not applied.
Args:
points (Union[np.ndarray, torch.Tensor, wp.array]): An array of shape (N, 3) comprising of 3D points in source frame.
points (Union[np.ndarray, torch.Tensor, wp.array]): A tensor of shape (P, 3) or (N, P, 3) comprising of 3D points in source frame.
position (Optional[Sequence[float]], optional): The position of source frame in target frame. Defaults to None.
orientation (Optional[Sequence[float]], optional): The orientation ``(w, x, y, z)`` of source frame in target frame.
Defaults to None.
......@@ -68,7 +68,7 @@ def transform_points(
if position is not None:
position = convert_to_torch(position, dtype=torch.float32, device=device)
# apply transformation
points = _transform_points_jit(points, position, orientation)
points = math_utils.transform_points(points, position, orientation)
# return everything according to input type
if is_numpy:
......@@ -147,7 +147,12 @@ def create_pointcloud_from_depth(
if orientation is not None:
orientation = convert_to_torch(orientation, dtype=torch.float32, device=device)
# compute pointcloud
depth_cloud = _create_pointcloud_from_depth_jit(intrinsic_matrix, depth, keep_invalid, position, orientation)
depth_cloud = math_utils.unproject_depth(depth, intrinsic_matrix)
# convert 3D points to world frame
depth_cloud = math_utils.transform_points(depth_cloud, position, orientation)
# keep only valid entries
pts_idx_to_keep = torch.all(torch.logical_and(~torch.isnan(depth_cloud), ~torch.isinf(depth_cloud)), dim=1)
depth_cloud = depth_cloud[pts_idx_to_keep, ...]
# return everything according to input type
if is_numpy:
......@@ -264,96 +269,3 @@ def create_pointcloud_from_rgbd(
return points_xyz.cpu().numpy(), points_rgb.cpu().numpy()
else:
return points_xyz, points_rgb
"""
Helper functions -- Internal
"""
@torch.jit.script
def _transform_points_jit(
points: torch.Tensor,
position: Optional[torch.Tensor] = None,
orientation: Optional[torch.Tensor] = None,
) -> torch.Tensor:
"""Transform input points in a given frame to a target frame.
Args:
points (torch.Tensor): An array of shape (N, 3) comprising of 3D points in source frame.
position (Optional[torch.Tensor], optional): The position of source frame in target frame. Defaults to None.
orientation (Optional[torch.Tensor], optional): The orientation ``(w, x, y, z)`` of source frame in target frame.
Defaults to None.
Returns:
torch.Tensor: A tensor of shape (N, 3) comprising of 3D points in target frame.
"""
# -- apply rotation
if orientation is not None:
points = torch.matmul(matrix_from_quat(orientation), points.T).T
# -- apply translation
if position is not None:
points += position
return points
@torch.jit.script
def _create_pointcloud_from_depth_jit(
intrinsic_matrix: torch.Tensor,
depth: torch.Tensor,
keep_invalid: bool = False,
position: Optional[torch.Tensor] = None,
orientation: Optional[torch.Tensor] = None,
) -> torch.Tensor:
"""Creates pointcloud from input depth image and camera intrinsic matrix.
Args:
intrinsic_matrix (torch.Tensor): A (3, 3) python tensor providing camera's calibration matrix.
depth (torch.tensor): An tensor of shape (H, W) with values encoding the depth measurement.
keep_invalid (bool, optional): Whether to keep invalid points in the cloud or not. Invalid points
correspond to pixels with depth values 0.0 or NaN. Defaults to False.
position (torch.Tensor, optional): The position of the camera in a target frame. Defaults to None.
orientation (torch.Tensor, optional): The orientation ``(w, x, y, z)`` of the camera in a target frame.
Defaults to None.
Raises:
ValueError: When intrinsic matrix is not of shape (3, 3).
ValueError: When depth image is not of shape (H, W) or (H, W, 1).
Returns:
torch.Tensor: A tensor of shape (N, 3) comprising of 3D coordinates of points.
"""
# squeeze out excess dimension
if len(depth.shape) == 3:
depth = depth.squeeze(dim=2)
# check shape of inputs
if intrinsic_matrix.shape != (3, 3):
raise ValueError(f"Input intrinsic matrix of invalid shape: {intrinsic_matrix.shape} != (3, 3).")
if len(depth.shape) != 2:
raise ValueError(f"Input depth image not two-dimensional. Received shape: {depth.shape}.")
# get image height and width
im_height, im_width = depth.shape
# convert image points into list of shape (3, H x W)
indices_u = torch.arange(im_width, device=depth.device, dtype=depth.dtype)
indices_v = torch.arange(im_height, device=depth.device, dtype=depth.dtype)
img_indices = torch.stack(torch.meshgrid([indices_u, indices_v], indexing="ij"), dim=0).reshape(2, -1)
pixels = torch.nn.functional.pad(img_indices, (0, 0, 0, 1), mode="constant", value=1.0)
# convert into 3D points
points = torch.matmul(torch.inverse(intrinsic_matrix), pixels)
points = points / points[-1, :]
points_xyz = points * depth.T.reshape(-1)
# convert it to (H x W , 3)
points_xyz = torch.transpose(points_xyz, dim0=0, dim1=1)
# convert 3D points to world frame
points_xyz = _transform_points_jit(points_xyz, position, orientation)
# remove points that have invalid depth
if not keep_invalid:
pts_idx_to_keep = torch.all(torch.logical_and(~torch.isnan(points_xyz), ~torch.isinf(points_xyz)), dim=1)
points_xyz = points_xyz[pts_idx_to_keep, ...]
return points_xyz # noqa: D504
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Rigid contact sensor based on :class:`omni.isaac.core.prims.RigidContactView`.
"""
from .contact_sensor import ContactSensor
from .contact_sensor_cfg import ContactSensorCfg
from .contact_sensor_data import ContactSensorData
__all__ = ["ContactSensor", "ContactSensorCfg", "ContactSensorData"]
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import torch
from typing import Sequence
import omni.isaac.core.utils.prims as prim_utils
import omni.isaac.core.utils.stage as stage_utils
from omni.isaac.core.prims import RigidPrimView
from pxr import PhysxSchema
from omni.isaac.orbit.markers import VisualizationMarkers
from omni.isaac.orbit.markers.config import CONTACT_SENSOR_MARKER_CFG
from ..sensor_base import SensorBase
from .contact_sensor_cfg import ContactSensorCfg
from .contact_sensor_data import ContactSensorData
class ContactSensor(SensorBase):
"""A contact reporting sensor."""
def __init__(self, cfg: ContactSensorCfg):
"""Initializes the contact sensor object.
Args:
cfg (ContactSensorCfg): The configuration parameters.
"""
# store config
self.cfg = cfg
# initialize base class
super().__init__(cfg)
# Create empty variables for storing output data
self._data = ContactSensorData()
def __str__(self) -> str:
"""Returns: A string containing information about the instance."""
return (
f"Contact sensor @ '{self._view._regex_prim_paths}': \n"
f"\tview type : {self._view.__class__}\n"
f"\tupdate period (s) : {self._update_period}\n"
f"\tnumber of sensors: {self.count}"
f"\tnumber of bodies : {len(self.num_bodies)}"
)
"""
Properties
"""
@property
def data(self) -> ContactSensorData:
"""Data related to contact sensor."""
return self._data
@property
def count(self) -> int:
"""Number of prims encapsulated."""
return self._count
@property
def num_bodies(self) -> int:
"""Number of prims encapsulated."""
return self._num_bodies
"""
Operations
"""
def spawn(self, prim_paths_expr: str):
"""Spawns the sensor in the scene.
In this case, it creates a contact sensor API on the rigid body prims that are
captured by the given prim path expression.
Example: If input is ``/World/Robot/Link*``, then the contact sensor API is
created on all rigid body prims that match the expression (e.g. ``/World/Robot/Link1``,
``/World/Robot/Link2``, etc.) and their children.
Raises:
RuntimeError: No rigid bodies found in the scene at the given prim path.
"""
prim_paths = prim_utils.find_matching_prim_paths(prim_paths_expr)
num_bodies = 0
# -- create contact sensor api
for prim_path in prim_paths:
# create contact sensor
prim = prim_utils.get_prim_at_path(prim_path)
# iterate over the prim and apply contact sensor API to all rigid bodies
for link_prim in prim.GetChildren() + [prim]:
if link_prim.HasAPI(PhysxSchema.PhysxRigidBodyAPI):
# add contact report API with threshold of zero
cr_api = PhysxSchema.PhysxContactReportAPI.Apply(link_prim)
cr_api.CreateThresholdAttr().Set(0)
# increment number of contact sensors made
num_bodies += 1
# -- store the number of bodies found to shape the data buffer
self._num_bodies = num_bodies
# check that we spawned at least one contact sensor
if num_bodies == 0:
raise RuntimeError(f"No rigid bodies found for the given prim path expression: {prim_paths_expr}")
def initialize(self, prim_paths_expr: str):
# create a rigid prim view for the sensor
self._view = RigidPrimView(
prim_paths_expr=prim_paths_expr,
reset_xform_properties=False,
track_contact_forces=True,
contact_filter_prim_paths_expr=self.cfg.filter_prim_paths_expr,
prepare_contact_sensors=False,
disable_stablization=True,
)
self._view.initialize()
# resolve the true count of bodies
self._count = self._view.count // self._num_bodies
# initialize the base class
super().initialize(prim_paths_expr)
# fill the data buffer
self._data.pos_w = torch.zeros(self.count, self._num_bodies, 3, device=self._device)
self._data.quat_w = torch.zeros(self.count, self._num_bodies, 4, device=self._device)
self._data.last_air_time = torch.zeros(self.count, self._num_bodies, device=self._device)
self._data.current_air_time = torch.zeros(self.count, self._num_bodies, device=self._device)
self._data.net_forces_w = torch.zeros(self.count, self._num_bodies, 3, device=self._device)
# force matrix: (num_sensors, num_bodies, num_shapes, num_filter_shapes, 3)
if len(self.cfg.filter_prim_paths_expr) != 0:
num_shapes = self._view._contact_view.num_shapes // self._num_bodies
num_filters = self._view._contact_view.num_filters
self._data.force_matrix_w = torch.zeros(
self.count, self._num_bodies, num_shapes, num_filters, 3, device=self._device
)
# visualization of the contact sensor
prim_path = stage_utils.get_next_free_path("/Visuals/ContactSensor")
self.contact_visualizer = VisualizationMarkers(prim_path, cfg=CONTACT_SENSOR_MARKER_CFG)
def reset_buffers(self, env_ids: Sequence[int] | None = None):
"""Resets the sensor internals.
Args:
env_ids (Sequence[int], optional): The sensor ids to reset. Defaults to None.
"""
# reset the timers and counters
super().reset_buffers(env_ids)
# resolve None
if env_ids is None:
env_ids = ...
# reset the data buffers
self._data.net_forces_w[env_ids] = 0.0
self._data.force_matrix_w[env_ids] = 0.0
self._data.current_air_time[env_ids] = 0.0
self._data.last_air_time[env_ids] = 0.0
def debug_vis(self):
# visualize the point hits
if self.cfg.debug_vis:
# marker indices
# 0: contact, 1: no contact
net_contact_force_w = torch.norm(self._data.net_forces_w, dim=-1)
marker_indices = torch.where(net_contact_force_w > 1.0, 0, 1)
# check if prim is visualized
self.contact_visualizer.visualize(self._data.pos_w.view(-1, 3), marker_indices=marker_indices.view(-1))
"""
Implementation.
"""
def _buffer(self, env_ids: Sequence[int] | None = None):
"""Fills the buffers of the sensor data."""
# default to all sensors
if env_ids is None:
env_ids = self._ALL_INDICES
# obtain the poses of the sensors
pos_w, quat_w = self._view.get_world_poses(clone=False)
self._data.pos_w[env_ids] = pos_w.view(-1, self._num_bodies, 3)[env_ids]
self._data.quat_w[env_ids] = quat_w.view(-1, self._num_bodies, 4)[env_ids]
# obtain the contact forces
# TODO: We are handling the indicing ourself because of the shape; (N, B) vs expected (N * B).
# This isn't the most efficient way to do this, but it's the easiest to implement.
net_forces_w = self._view.get_net_contact_forces(dt=self._sim_physics_dt, clone=False)
self._data.net_forces_w[env_ids] = net_forces_w.view(-1, self._num_bodies, 3)[env_ids]
# obtain the contact force matrix
if len(self.cfg.filter_prim_paths_expr) != 0:
num_shapes = self._view._contact_view.num_shapes // self._num_bodies
num_filters = self._view._contact_view.num_filters
force_matrix_w = self._view.get_contact_force_matrix(dt=self._sim_physics_dt, clone=False)
force_matrix_w = force_matrix_w.view(-1, self._num_bodies, num_shapes, num_filters, 3)
self._data.force_matrix_w[env_ids] = force_matrix_w[env_ids]
# contact state
# -- time elapsed since last update
# since this function is called every frame, we can use the difference to get the elapsed time
elapsed_time = self._timestamp[env_ids] - self._timestamp_last_update[env_ids]
# -- check contact state of bodies
is_bodies_contact = torch.norm(self._data.net_forces_w[env_ids], dim=-1) > 1.0
is_bodies_first_contact = (self._data.current_air_time[env_ids] > 0) * is_bodies_contact
# -- update ongoing timer for bodies air
self._data.current_air_time[env_ids] += elapsed_time.unsqueeze(-1)
# -- update time for the last time bodies were in contact
self._data.last_air_time[env_ids] = self._data.current_air_time[env_ids] * is_bodies_first_contact
# -- increment timers for bodies that are not in contact
self._data.current_air_time[env_ids] *= ~is_bodies_contact
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Configuration for the contact sensor."""
from __future__ import annotations
from omni.isaac.orbit.utils import configclass
from ..sensor_base_cfg import SensorBaseCfg
@configclass
class ContactSensorCfg(SensorBaseCfg):
"""Configuration for the contact sensor."""
filter_prim_paths_expr: list[str] = list()
"""The list of primitive paths to filter contacts with.
For example, if you want to filter contacts with the ground plane, you can set this to
``["/World/ground_plane"]``. In this case, the contact sensor will only report contacts
with the ground plane while using the :meth:`omni.isaac.core.prims.RigidContactView.get_contact_force_matrix`
method.
If an empty list is provided, then only net contact forces are reported.
"""
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import torch
from dataclasses import dataclass
@dataclass
class ContactSensorData:
"""Data container for the contact reporting sensor."""
pos_w: torch.Tensor = None
"""Position of the sensor origin in world frame.
Shape is (N, 3), where ``N`` is the number of sensors.
"""
quat_w: torch.Tensor = None
"""Orientation of the sensor origin in quaternion ``(w, x, y, z)`` in world frame.
Shape is (N, 4), where ``N`` is the number of sensors.
"""
net_forces_w: torch.Tensor = None
"""The net contact forces in world frame.
Shape is (N, B, 3), where ``N`` is the number of sensors and ``B`` is the number of bodies in each sensor.
"""
force_matrix_w: torch.Tensor = None
"""The contact forces filtered between the sensor bodies and filtered bodies in world frame.
Shape is (N, B, S, M, 3), where ``N`` is the number of sensors, ``B`` is number of bodies in each sensor,
``S`` is number of shapes per body and ``M`` is the number of filtered bodies.
If the :attr:`ContactSensorCfg.filter_prim_paths_expr` is empty, then this tensor will be empty.
"""
last_air_time: torch.Tensor = None
"""Time spent (in s) in the air before the last contact.
Shape is (N,), where ``N`` is the number of sensors.
"""
current_air_time: torch.Tensor = None
"""Time spent (in s) in the air since the last contact.
Shape is (N,), where ``N`` is the number of sensors.
"""
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Ray-caster based on warp.
"""
from .patterns_cfg import BpearlPatternCfg, GridPatternCfg, PatternBaseCfg, PinholeCameraPatternCfg
from .ray_caster import RayCaster
from .ray_caster_cfg import RayCasterCfg
from .ray_caster_data import RayCasterData
__all__ = [
"RayCaster",
"RayCasterData",
"RayCasterCfg",
# patterns
"PatternBaseCfg",
"GridPatternCfg",
"PinholeCameraPatternCfg",
"BpearlPatternCfg",
]
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import numpy as np
import torch
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .patterns_cfg import BpearlPatternCfg, GridPatternCfg, PinholeCameraPatternCfg
def grid_pattern(cfg: GridPatternCfg, device: str) -> tuple[torch.Tensor, torch.Tensor]:
"""A regular grid pattern for ray casting.
The grid pattern is made from rays that are parallel to each other. They span a 2D grid in the sensor's
local coordinates from ``(-length/2, -width/2)`` to ``(length/2, width/2)``, which is defined
by the ``size = (length, width)`` and ``resolution`` parameters in the config.
Args:
cfg (GridPatternCfg): The configuration instance for the pattern.
device (str): The device to create the pattern on.
Returns:
tuple[torch.Tensor, torch.Tensor]: The starting positions and directions of the rays.
"""
x = torch.arange(start=-cfg.size[0] / 2, end=cfg.size[0] / 2 + 1.0e-9, step=cfg.resolution, device=device)
y = torch.arange(start=-cfg.size[1] / 2, end=cfg.size[1] / 2 + 1.0e-9, step=cfg.resolution, device=device)
grid_x, grid_y = torch.meshgrid(x, y, indexing="xy")
num_rays = grid_x.numel()
ray_starts = torch.zeros(num_rays, 3, device=device)
ray_starts[:, 0] = grid_x.flatten()
ray_starts[:, 1] = grid_y.flatten()
ray_directions = torch.zeros_like(ray_starts)
ray_directions[..., :] = torch.tensor(list(cfg.direction), device=device)
return ray_starts, ray_directions
def pinhole_camera_pattern(cfg: PinholeCameraPatternCfg, device: str) -> tuple[torch.Tensor, torch.Tensor]:
"""The depth-image pattern for ray casting.
Args:
cfg (DepthImagePatternCfg): The configuration instance for the pattern.
device (str): The device to create the pattern on.
Returns:
tuple[torch.Tensor, torch.Tensor]: The starting positions and directions of the rays.
"""
x_grid = torch.full((cfg.height, cfg.width), cfg.far_plane, device=device)
y_range = np.tan(np.deg2rad(cfg.horizontal_fov) / 2.0) * cfg.far_plane
y = torch.linspace(y_range, -y_range, cfg.width, device=device)
z_range = y_range * cfg.height / cfg.width
z = torch.linspace(z_range, -z_range, cfg.height, device=device)
y_grid, z_grid = torch.meshgrid(y, z, indexing="xy")
ray_directions = torch.cat([x_grid.unsqueeze(2), y_grid.unsqueeze(2), z_grid.unsqueeze(2)], dim=2)
ray_directions = torch.nn.functional.normalize(ray_directions, p=2.0, dim=-1).view(-1, 3)
ray_starts = torch.zeros_like(ray_directions)
return ray_starts, ray_directions
def bpearl_pattern(cfg: BpearlPatternCfg, device: str) -> tuple[torch.Tensor, torch.Tensor]:
"""The RS-Bpearl pattern for ray casting.
The `Robosense RS-Bpearl`_ is a short-range LiDAR that has a 360 degrees x 90 degrees super wide
field of view. It is designed for near-field blind-spots detection.
.. _Robosense RS-Bpearl: https://www.roscomponents.com/en/lidar-laser-scanner/267-rs-bpearl.html
Args:
cfg (BpearlPatternCfg): The configuration instance for the pattern.
device (str): The device to create the pattern on.
Returns:
tuple[torch.Tensor, torch.Tensor]: The starting positions and directions of the rays.
"""
h = torch.arange(-cfg.horizontal_fov / 2, cfg.horizontal_fov / 2, cfg.horizontal_res, device=device)
v = torch.tensor(list(cfg.vertical_ray_angles), device=device)
pitch, yaw = torch.meshgrid(v, h, indexing="xy")
pitch, yaw = torch.deg2rad(pitch.reshape(-1)), torch.deg2rad(yaw.reshape(-1))
pitch += torch.pi / 2
x = torch.sin(pitch) * torch.cos(yaw)
y = torch.sin(pitch) * torch.sin(yaw)
z = torch.cos(pitch)
ray_directions = -torch.stack([x, y, z], dim=1)
ray_starts = torch.zeros_like(ray_directions)
return ray_starts, ray_directions
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Configuration for the ray-cast sensor."""
from __future__ import annotations
import torch
from dataclasses import MISSING
from typing import Callable, Sequence
from omni.isaac.orbit.utils import configclass
from . import patterns
@configclass
class PatternBaseCfg:
"""Base configuration for a pattern."""
func: Callable[[PatternBaseCfg, str], tuple[torch.Tensor, torch.Tensor]] = MISSING
"""Function to generate the pattern.
The function should take in the configuration and the device name as arguments. It should return
the pattern's starting positions and directions as a tuple of torch.Tensor.
"""
@configclass
class GridPatternCfg(PatternBaseCfg):
"""Configuration for the grid pattern for ray-casting.
Defines a 2D grid of rays in the coordinates of the sensor.
"""
func = patterns.grid_pattern
resolution: float = MISSING
"""Grid resolution (in meters)."""
size: tuple[float, float] = MISSING
"""Grid size (length, width) (in meters)."""
direction: tuple[float, float, float] = (0.0, 0.0, -1.0)
"""Ray direction. Defaults to (0.0, 0.0, -1.0)."""
@configclass
class PinholeCameraPatternCfg(PatternBaseCfg):
"""Configuration for a pinhole camera depth image pattern for ray-casting."""
func = patterns.pinhole_camera_pattern
horizontal_fov: float = MISSING
"""Horizontal field of view (in degrees)."""
width: int = MISSING
"""Width of the image (in pixels)."""
height: int = MISSING
"""Height of the image (in pixels)."""
far_plane: float = MISSING
"""Far plane of the image (in meters)."""
@configclass
class BpearlPatternCfg(PatternBaseCfg):
"""Configuration for the Bpearl pattern for ray-casting."""
func = patterns.bpearl_pattern
horizontal_fov: float = 360.0
"""Horizontal field of view (in degrees). Defaults to 360.0."""
horizontal_res: float = 10.0
"""Horizontal resolution (in degrees). Defaults to 10.0."""
# fmt: off
vertical_ray_angles: Sequence[float] = [
89.5, 86.6875, 83.875, 81.0625, 78.25, 75.4375, 72.625, 69.8125, 67.0, 64.1875, 61.375,
58.5625, 55.75, 52.9375, 50.125, 47.3125, 44.5, 41.6875, 38.875, 36.0625, 33.25, 30.4375,
27.625, 24.8125, 22, 19.1875, 16.375, 13.5625, 10.75, 7.9375, 5.125, 2.3125
]
# fmt: on
"""Vertical ray angles (in degrees). Defaults to a list of 32 angles.
Note:
We manually set the vertical ray angles to match the Bpearl sensor. The ray-angles
are not evenly spaced.
"""
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import numpy as np
import torch
from typing import Sequence
import carb
import omni.isaac.core.utils.prims as prim_utils
import omni.isaac.core.utils.stage as stage_utils
from omni.isaac.core.articulations import ArticulationView
from omni.isaac.core.prims import RigidPrimView, XFormPrimView
from pxr import UsdGeom, UsdPhysics
from omni.isaac.orbit.markers import VisualizationMarkers
from omni.isaac.orbit.markers.config import RAY_CASTER_MARKER_CFG
from omni.isaac.orbit.utils.math import quat_apply, quat_apply_yaw
from omni.isaac.orbit.utils.warp import convert_to_warp_mesh, raycast_mesh
from ..sensor_base import SensorBase
from .ray_caster_cfg import RayCasterCfg
from .ray_caster_data import RayCasterData
class RayCaster(SensorBase):
"""A ray-casting sensor.
The ray-caster uses a set of rays to detect collisions with meshes in the scene. The rays are
defined in the sensor's local coordinate frame. The sensor can be configured to ray-cast against
a set of meshes with a given ray pattern.
The meshes are parsed from the list of primitive paths provided in the configuration. These are then
converted to warp meshes and stored in the `warp_meshes` list. The ray-caster then ray-casts against
these warp meshes using the ray pattern provided in the configuration.
.. note::
Currently, only static meshes are supported. Extending the warp mesh to support dynamic meshes
is a work in progress.
"""
def __init__(self, cfg: RayCasterCfg):
"""Initializes the ray-caster object.
Args:
cfg (RayCasterCfg): The configuration parameters.
"""
# store config
self.cfg = cfg
# initialize base class
super().__init__(cfg)
# Create empty variables for storing output data
self._data = RayCasterData()
# List of meshes to ray-cast
self.warp_meshes = []
def __str__(self) -> str:
"""Returns: A string containing information about the instance."""
return (
f"Ray-caster @ '{self._view._regex_prim_paths}': \n"
f"\tview type : {self._view.__class__}\n"
f"\tupdate period (s) : {self._update_period}\n"
f"\tnumber of meshes : {len(self.warp_meshes)}\n"
f"\tnumber of sensors: {self.view.count}\n"
f"\tnumber of rays/sensor : {self.num_rays}\n"
f"\ttotal number of rays : {self.num_rays * self.view.count}"
)
"""
Properties
"""
@property
def data(self) -> RayCasterData:
"""Data related to ray-caster."""
return self._data
"""
Operations
"""
def spawn(self, prim_path: str, *args, **kwargs):
"""Spawns the sensor in the scene.
Note:
Ray-caster is a virtual sensor and does not need to be spawned. However,
this function is required by the base class.
"""
pass
def initialize(self, prim_paths_expr: str):
# check if the prim at path is a articulated or rigid prim
# we do this since for physics-based view classes we can access their data directly
# otherwise we need to use the xform view class which is slower
prim_view_class = None
for prim_path in prim_utils.find_matching_prim_paths(prim_paths_expr):
# get prim at path
prim = prim_utils.get_prim_at_path(prim_path)
# check if it is a rigid prim
if prim.HasAPI(UsdPhysics.ArticulationRootAPI):
prim_view_class = ArticulationView
elif prim.HasAPI(UsdPhysics.RigidBodyAPI):
prim_view_class = RigidPrimView
else:
prim_view_class = XFormPrimView
carb.log_warn(f"The prim at path {prim_path} is not a rigid prim! Using XFormPrimView.")
# break the loop
break
# check if prim view class is found
if prim_view_class is None:
raise RuntimeError(f"Failed to find a valid prim view class for the prim paths: {prim_paths_expr}")
# create a rigid prim view for the sensor
self._view = prim_view_class(prim_paths_expr, reset_xform_properties=False)
self._view.initialize()
# initialize the base class
super().initialize(prim_paths_expr)
# Check that backend is compatible
if self._backend != "torch":
raise RuntimeError(f"RayCaster only supports PyTorch backend. Received: {self._backend}.")
# check number of mesh prims provided
if len(self.cfg.mesh_prim_paths) != 1:
raise NotImplementedError(
f"RayCaster currently only supports one mesh prim. Received: {len(self.cfg.mesh_prim_paths)}"
)
# read prims to ray-cast
for mesh_prim_path in self.cfg.mesh_prim_paths:
# obtain the mesh prim
mesh_prim = prim_utils.get_first_matching_child_prim(
mesh_prim_path, lambda p: prim_utils.get_prim_type_name(p) == "Mesh"
)
# check if valid
if not prim_utils.is_prim_path_valid(mesh_prim_path):
raise RuntimeError(f"Invalid mesh prim path: {mesh_prim_path}")
# cast into UsdGeomMesh
mesh_prim = UsdGeom.Mesh(mesh_prim)
# read the vertices and faces
points = np.asarray(mesh_prim.GetPointsAttr().Get())
indices = np.asarray(mesh_prim.GetFaceVertexIndicesAttr().Get())
wp_mesh = convert_to_warp_mesh(points, indices, device=self.device)
# add the warp mesh to the list
self.warp_meshes.append(wp_mesh)
# throw an error if no meshes are found
if len(self.warp_meshes) == 0:
raise RuntimeError("No meshes found for ray-casting! Please check the mesh prim paths.")
# compute ray stars and directions
self.ray_starts, self.ray_directions = self.cfg.pattern_cfg.func(self.cfg.pattern_cfg, self._device)
self.num_rays = len(self.ray_directions)
# apply offset transformation to the rays
offset_pos = torch.tensor(list(self.cfg.pos_offset), device=self._device)
offset_quat = torch.tensor(list(self.cfg.quat_offset), device=self._device)
self.ray_directions = quat_apply(offset_quat.repeat(len(self.ray_directions), 1), self.ray_directions)
self.ray_starts += offset_pos
# repeat the rays for each sensor
self.ray_starts = self.ray_starts.repeat(self._view.count, 1, 1)
self.ray_directions = self.ray_directions.repeat(self._view.count, 1, 1)
# fill the data buffer
self._data.pos_w = torch.zeros(self._view.count, 3, device=self._device)
self._data.quat_w = torch.zeros(self._view.count, 4, device=self._device)
self._data.ray_hits_w = torch.zeros(self._view.count, self.num_rays, 3, device=self._device)
# visualization of the ray-caster
prim_path = stage_utils.get_next_free_path("/Visuals/RayCaster")
self.ray_visualizer = VisualizationMarkers(prim_path, cfg=RAY_CASTER_MARKER_CFG)
def reset_buffers(self, env_ids: Sequence[int] | None = None):
"""Resets the sensor internals.
Args:
env_ids (Sequence[int], optional): The sensor ids to reset. Defaults to None.
"""
# reset the timers and counters
super().reset_buffers(env_ids)
# force buffer the data -> needed for reset observations
self._buffer(env_ids)
def debug_vis(self):
# visualize the point hits
if self.cfg.debug_vis:
# check if prim is visualized
self.ray_visualizer.visualize(self._data.ray_hits_w.view(-1, 3))
"""
Implementation.
"""
def _buffer(self, env_ids: Sequence[int] | None = None):
"""Fills the buffers of the sensor data."""
# default to all sensors
if env_ids is None:
env_ids = ...
# obtain the poses of the sensors
pos_w, quat_w = self._view.get_world_poses(clone=False)
self._data.pos_w[env_ids] = pos_w
self._data.quat_w[env_ids] = quat_w
# ray cast based on the sensor poses
if self.cfg.attach_yaw_only:
# only yaw orientation is considered and directions are not rotated
ray_starts_w = quat_apply_yaw(quat_w.repeat(1, self.num_rays), self.ray_starts[env_ids])
ray_starts_w += pos_w.unsqueeze(1)
ray_directions_w = self.ray_directions[env_ids]
else:
# full orientation is considered
ray_starts_w = quat_apply(quat_w.repeat(1, self.num_rays), self.ray_starts[env_ids])
ray_starts_w += pos_w.unsqueeze(1)
ray_directions_w = quat_apply(quat_w.repeat(1, self.num_rays), self.ray_directions[env_ids])
# ray cast and store the hits
# TODO: Make this work for multiple meshes?
self._data.ray_hits_w[env_ids] = raycast_mesh(ray_starts_w, ray_directions_w, self.warp_meshes[0])
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Configuration for the ray-cast sensor."""
from __future__ import annotations
from dataclasses import MISSING
from omni.isaac.orbit.utils import configclass
from ..sensor_base_cfg import SensorBaseCfg
from .patterns_cfg import PatternBaseCfg
@configclass
class RayCasterCfg(SensorBaseCfg):
"""Configuration for the ray-cast sensor."""
mesh_prim_paths: list[str] = MISSING
"""The list of mesh primitive paths to ray cast against.
Note:
Currently, only a single static mesh is supported. We are working on supporting multiple
static meshes and dynamic meshes.
"""
pos_offset: tuple[float, float, float] = (0.0, 0.0, 0.0)
"""The position offset from the frame the sensor is attached to. Defaults to (0.0, 0.0, 0.0)."""
quat_offset: tuple[float, float, float, float] = (1.0, 0.0, 0.0, 0.0)
"""The quaternion offset (w, x, y, z) from the frame the sensor is attached to. Defaults to (1.0, 0.0, 0.0, 0.0)."""
attach_yaw_only: bool = MISSING
"""Whether the rays' starting positions and directions only track the yaw orientation.
This is useful for ray-casting height maps, where only yaw rotation is needed.
"""
pattern_cfg: PatternBaseCfg = MISSING
"""The pattern that defines the local ray starting positions and directions."""
max_distance: float = 100.0
"""Maximum distance (in meters) from the sensor to ray cast to. Defaults to 100.0."""
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import torch
from dataclasses import dataclass
@dataclass
class RayCasterData:
"""Data container for the ray-cast sensor."""
pos_w: torch.Tensor = None
"""Position of the sensor origin in world frame.
Shape is (N, 3), where ``N`` is the number of sensors.
"""
quat_w: torch.Tensor = None
"""Orientation of the sensor origin in quaternion ``(w, x, y, z)`` in world frame.
Shape is (N, 4), where ``N`` is the number of sensors.
"""
ray_hits_w: torch.Tensor = None
"""The ray hit positions in the world frame.
Shape is (N, B, 3), where ``N`` is the number of sensors, ``B`` is the number of rays
in the scan pattern per sensor.
"""
......@@ -10,101 +10,151 @@ Each sensor class should inherit from this class and implement the abstract meth
"""
from abc import abstractmethod
from typing import Any
from abc import ABC, abstractmethod
from typing import Any, List, Optional, Sequence
from omni.isaac.core.prims import XFormPrimView
from omni.isaac.core.simulation_context import SimulationContext
class SensorBase:
"""The base class for implementing a sensor.
from omni.isaac.orbit.utils.array import TensorData
Note:
These sensors are not vectorized yet.
"""
from .sensor_base_cfg import SensorBaseCfg
class SensorBase(ABC):
"""The base class for implementing a sensor."""
def __init__(self, sensor_tick: float = 0.0):
def __init__(self, cfg: SensorBaseCfg):
"""Initialize the sensor class.
The sensor tick is the time between two sensor buffers. If the sensor tick is zero, then the sensor
buffers are filled at every simulation step.
Args:
sensor_tick (float, optional): Simulation seconds between sensor buffers. Defaults to 0.0.
cfg (SensorBaseCfg): The configuration parameters for the sensor.
"""
# Copy arguments to class
self._sensor_tick: float = sensor_tick
# Store the inputs
self.cfg = cfg
# Resolve the sensor update period
self._update_period = 0.0 if cfg.update_freq == 0.0 else 1.0 / cfg.update_freq
# Sensor view -- This can be used to access the underlying XFormPrimView
# Note: This is not initialized here. It is initialized in the derived class.
self._view: XFormPrimView = None
# Current timestamp of animation (in seconds)
self._timestamp: float = 0.0
self._timestamp: TensorData
# Timestamp from last update
self._timestamp_last_update: float = 0.0
self._timestamp_last_update: TensorData
# Frame number when the measurement is taken
self._frame: int = 0
self._frame: TensorData
# Flag for whether the sensor is initialized
self._is_initialized: bool = False
"""
Properties
"""
@property
def frame(self) -> int:
def view(self) -> XFormPrimView:
"""The underlying view of the sensor."""
return self._view
@property
def prim_paths(self) -> List[str]:
"""The paths to the sensor prims."""
return self._view.prim_paths
@property
def count(self) -> int:
"""Number of prims encapsulated."""
return self._view.count
@property
def device(self) -> str:
"""Device where the sensor is running."""
return self._device
@property
def frame(self) -> TensorData:
"""Frame number when the measurement took place."""
return self._frame
@property
def timestamp(self) -> float:
def timestamp(self) -> TensorData:
"""Simulation time of the measurement (in seconds)."""
return self._timestamp
@property
def sensor_tick(self) -> float:
"""Simulation seconds between sensor buffers (ticks)."""
return self._sensor_tick
@property
@abstractmethod
def data(self) -> Any:
"""The data from the simulated sensor."""
return None # noqa: R501
"""
Helpers
"""
def set_visibility(self, visible: bool):
"""Set visibility of the instance in the scene.
Note:
Sensors are mostly XForms which do not have any mesh associated to them. Thus,
overriding this method is optional.
Args:
visible (bool) -- Whether to make instance visible or invisible.
"""
pass
raise NotImplementedError("The data property is not implemented!")
"""
Operations
"""
@abstractmethod
def spawn(self, parent_prim_path: str):
def spawn(self, prim_path: str, *args, **kwargs):
"""Spawns the sensor into the stage.
Args:
parent_prim_path (str): The path of the parent prim to attach sensor to.
prim_path (str): The path of the prim to attach the sensor to.
"""
raise NotImplementedError
@abstractmethod
def initialize(self):
"""Initializes the sensor handles and internal buffers."""
raise NotImplementedError
def initialize(self, prim_paths_expr: str = None):
"""Initializes the sensor handles and internal buffers.
Args:
prim_paths_expr (str, optional): The prim path expression for the sensors. Defaults to None.
Raises:
RuntimeError: If the simulation context is not initialized.
RuntimeError: If no prims are found for the given prim path expression.
"""
# Obtain Simulation Context
sim = SimulationContext.instance()
if sim is not None:
self._backend = sim.backend
self._device = sim.device
self._backend_utils = sim.backend_utils
self._sim_physics_dt = sim.get_physics_dt()
else:
raise RuntimeError("Simulation Context is not initialized!")
# Check that view is not None
if self._view is None:
self._view = XFormPrimView(prim_paths_expr, "sensor_view", reset_xform_properties=False)
# Check is prims are found under the given prim path expression
if self._view.count == 0:
raise RuntimeError(f"No prims found for the given prim path expression: {prim_paths_expr}")
# Create constant for the number of sensors
self._ALL_INDICES = self._backend_utils.resolve_indices(None, self.count, device=self.device)
# Current timestamp of animation (in seconds)
self._timestamp = self._backend_utils.create_zeros_tensor((self.count,), "float32", self.device)
# Timestamp from last update
self._timestamp_last_update = self._backend_utils.create_zeros_tensor((self.count,), "float32", self.device)
# Frame number when the measurement is taken
self._frame = self._backend_utils.create_zeros_tensor((self.count,), "int64", self.device)
# Set the initialized flag
self._is_initialized = True
def reset(self):
"""Resets the sensor internals."""
# Set current time
self._timestamp = 0.0
# Set zero captures
self._frame = 0
def reset_buffers(self, env_ids: Optional[Sequence[int]] = None):
"""Resets the sensor internals.
def update(self, dt: float, *args, **kwargs):
Args:
env_ids (Optional[Sequence[int]], optional): The sensor ids to reset. Defaults to None.
"""
# Resolve sensor ids
if env_ids is None:
env_ids = ...
# Reset the timestamp for the sensors
self._timestamp[env_ids] = 0.0
self._timestamp_last_update[env_ids] = 0.0
# Reset the frame count
self._frame[env_ids] = 0
def update_buffers(self, dt: float):
"""Updates the buffers at sensor frequency.
This function performs time-based checks and fills the data into the data container. It
......@@ -113,27 +163,53 @@ class SensorBase:
Args:
dt (float): The simulation time-step.
args (tuple): Other positional arguments passed to function :meth:`buffer()`.
kwargs (dict): Other keyword arguments passed to function :meth:`buffer()`.
"""
# Get current time
# Update the timestamp for the sensors
self._timestamp += dt
# Buffer the sensor data.
if (self._timestamp - self._timestamp_last_update) >= self._sensor_tick:
# Check if the sensor is ready to capture
env_ids = self._timestamp - self._timestamp_last_update >= self._update_period
# Get the indices of the sensors that are ready to capture
if self._backend == "torch":
env_ids = env_ids.nonzero(as_tuple=False).squeeze(-1)
elif self._backend == "numpy":
env_ids = env_ids.nonzero()[0]
else:
raise NotImplementedError(f"Backend '{self._backend}' is not supported.")
# Check if any sensor is ready to capture
if len(env_ids) > 0:
# Buffer the data
self.buffer(*args, **kwargs)
self._buffer(env_ids)
# Update the frame count
self._frame += 1
self._frame[env_ids] += 1
# Update capture time
self._timestamp_last_update = self._timestamp
self._timestamp_last_update[env_ids] = self._timestamp[env_ids]
def debug_vis(self):
"""Visualizes the sensor data.
This is an empty function that can be overridden by the derived class to visualize the sensor data.
Note:
Visualization of sensor data may add overhead to the simulation. It is recommended to disable
visualization when running the simulation in headless mode.
Args:
visualize (bool, optional): Whether to visualize the sensor data. Defaults to True.
"""
pass
"""
Implementation specific.
"""
@abstractmethod
def buffer(self, *args, **kwargs):
def _buffer(self, env_ids: Optional[Sequence[int]] = None):
"""Fills the buffers of the sensor data.
This function does not perform any time-based checks and directly fills the data into the data container.
Warning:
Although this method is public, `update(dt)` should be the preferred way of filling buffers.
Args:
env_ids (Optional[Sequence[int]]): The indices of the sensors that are ready to capture.
If None, then all sensors are ready to capture. Defaults to None.
"""
raise NotImplementedError
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from omni.isaac.orbit.utils import configclass
@configclass
class SensorBaseCfg:
"""Configuration parameters for a sensor."""
update_freq: float = 0.0
"""Update frequency of the sensor buffers (in Hz). Defaults to 0.0.
If the sensor frequency is zero, then the sensor buffers are filled at every simulation step.
"""
debug_vis: bool = False
"""Whether to visualize the sensor. Defaults to False."""
......@@ -27,4 +27,4 @@ from .terrain_cfg import SubTerrainBaseCfg, TerrainGeneratorCfg, TerrainImporter
from .terrain_generator import TerrainGenerator
from .terrain_importer import TerrainImporter
from .trimesh import * # noqa: F401, F403
from .utils import color_meshes_by_height, convert_to_warp_mesh, create_prim_from_mesh
from .utils import color_meshes_by_height, create_prim_from_mesh
......@@ -15,10 +15,11 @@ from pxr import UsdGeom
from omni.isaac.orbit.compat.markers import StaticMarker
from omni.isaac.orbit.utils.kit import create_ground_plane
from omni.isaac.orbit.utils.warp import convert_to_warp_mesh
from .terrain_cfg import TerrainImporterCfg
from .trimesh.utils import make_plane
from .utils import convert_to_warp_mesh, create_prim_from_mesh
from .utils import create_prim_from_mesh
class TerrainImporter:
......
......@@ -7,8 +7,6 @@ import numpy as np
import trimesh
from typing import List
import warp as wp
def color_meshes_by_height(meshes: List[trimesh.Trimesh], **kwargs) -> trimesh.Trimesh:
"""
......@@ -125,23 +123,3 @@ def create_prim_from_mesh(prim_path: str, vertices: np.ndarray, triangles: np.nd
physx_material_api.CreateRestitutionCombineModeAttr().Set(combine_mode)
# apply physics material to ground plane
GeometryPrim(f"{prim_path}/mesh", collision=True).apply_physics_material(material)
def convert_to_warp_mesh(vertices: np.ndarray, triangles: np.ndarray, device: str) -> wp.Mesh:
"""Create a warp mesh object with a mesh defined from vertices and triangles.
Args:
vertices (np.ndarray): The vertices of the mesh. Shape is :math:`(N, 3)`, where :math:`N`
is the number of vertices.
triangles (np.ndarray): The triangles of the mesh as references to vertices for each triangle.
Shape is :math:`(M, 3)`, where :math:`M` is the number of triangles / faces.
device (str): The device to use for the mesh.
Returns:
wp.Mesh: The warp mesh object.
"""
# create warp mesh
return wp.Mesh(
points=wp.array(vertices.astype(np.float32), dtype=wp.vec3, device=device),
indices=wp.array(triangles.astype(np.int32).flatten(), dtype=int, device=device),
)
......@@ -14,7 +14,7 @@ Sub-module containing utilities for the Orbit framework.
* `timer`: Provides a timer class (uses contextlib) for benchmarking.
"""
from .array import TENSOR_TYPE_CONVERSIONS, TENSOR_TYPES, convert_to_torch
from .array import TENSOR_TYPE_CONVERSIONS, TENSOR_TYPES, TensorData, convert_to_torch
from .configclass import configclass
from .dict import class_to_dict, convert_dict_to_backend, print_dict, update_class_from_dict, update_dict
from .string import to_camel_case, to_snake_case
......@@ -22,6 +22,7 @@ from .timer import Timer
__all__ = [
# arrays
"TensorData",
"TENSOR_TYPES",
"TENSOR_TYPE_CONVERSIONS",
"convert_to_torch",
......
......@@ -7,12 +7,18 @@
import numpy as np
import torch
from typing import Optional, Sequence, Union
from typing import Optional, Union
import warp as wp
__all__ = ["TENSOR_TYPES", "TENSOR_TYPE_CONVERSIONS", "convert_to_torch"]
TensorData = Union[np.ndarray, torch.Tensor, wp.array]
"""Type definition for a tensor data.
Union of numpy, torch, and warp arrays.
"""
TENSOR_TYPES = {
"numpy": np.ndarray,
"torch": torch.Tensor,
......@@ -37,7 +43,7 @@ inner dictionary are the source backend (``np.ndarray``, ``torch.Tensor``, ``wp.
def convert_to_torch(
array: Sequence[float],
array: TensorData,
dtype: torch.dtype = None,
device: Optional[Union[torch.device, str]] = None,
) -> torch.Tensor:
......@@ -51,7 +57,7 @@ def convert_to_torch(
this defaults to "cpu", for torch tensors it is "cpu" or "cuda", and for warp arrays it is "cuda".
Args:
array (Sequence[float]): The input array. It can be a numpy array, warp array, python list/tuple, or torch tensor.
array (TensorData): The input array. It can be a numpy array, warp array, python list/tuple, or torch tensor.
dtype (torch.dtype, optional): Target data-type for the tensor.
device (Optional[Union[torch.device, str]], optional): The target device for the tensor. Defaults to None.
......@@ -62,6 +68,11 @@ def convert_to_torch(
if isinstance(array, torch.Tensor):
tensor = array
elif isinstance(array, np.ndarray):
# if the datatype is not currently supported by torch we need to improvise
# supported types are: https://pytorch.org/docs/stable/tensors.html
if array.dtype == np.uint32:
array = array.astype(np.int64)
# need to deal with object arrays (np.void) separatelyWWW
tensor = torch.from_numpy(array)
elif isinstance(array, wp.array):
tensor = wp.to_torch(array)
......
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Operations based on warp."""
import numpy as np
import torch
import warp as wp
from .kernels import *
__all__ = ["raycast_mesh", "convert_to_warp_mesh"]
def raycast_mesh(ray_starts: torch.Tensor, ray_directions: torch.Tensor, mesh: wp.Mesh) -> torch.Tensor:
"""Performs ray-casting against a mesh.
Note that the `ray_starts` and `ray_directions`, and `ray_hits` should have compatible shapes
and data types to ensure proper execution. Additionally, they all must be in the same frame.
Args:
ray_starts (torch.Tensor): The starting position of the rays. Shape (N, 3).
ray_directions (torch.Tensor): The ray directions for each ray. Shape (N, 3).
mesh (wp.Mesh): The warp mesh to ray-cast against.
Returns:
torch.Tensor: The ray hit position. Shape (N, 3).
The returned tensor contains :obj:`float('inf')` for missed hits.
"""
# extract device and shape information
shape = ray_starts.shape
device = ray_starts.device
# device of the mesh
mesh_device = wp.device_to_torch(mesh.device)
# reshape the tensors
ray_starts = ray_starts.to(mesh_device).view(-1, 3)
ray_directions = ray_directions.to(mesh_device).view(-1, 3)
num_rays = ray_starts.shape[0]
# create output tensor for the ray hits
ray_hits = torch.full((num_rays, 3), float("inf"), device=mesh_device)
# map the memory to warp arrays
ray_starts_wp = wp.from_torch(ray_starts, dtype=wp.vec3, requires_grad=False)
ray_directions_wp = wp.from_torch(ray_directions, dtype=wp.vec3, requires_grad=False)
ray_hits_wp = wp.from_torch(ray_hits, dtype=wp.vec3, requires_grad=False)
# launch the warp kernel
wp.launch(
kernel=raycast_mesh_kernel,
dim=num_rays,
inputs=[mesh.id, ray_starts_wp, ray_directions_wp, ray_hits_wp],
device=mesh.device,
)
# NOTE: Synchronize is not needed anymore, but we keep it for now. Check with @dhoeller.
wp.synchronize()
return ray_hits.to(device).view(shape)
def convert_to_warp_mesh(points: np.ndarray, indices: np.ndarray, device: str) -> wp.Mesh:
"""Create a warp mesh object with a mesh defined from vertices and triangles.
Args:
points (np.ndarray): The vertices of the mesh. Shape is :math:`(N, 3)`, where :math:`N`
is the number of vertices.
indices (np.ndarray): The triangles of the mesh as references to vertices for each triangle.
Shape is :math:`(M, 3)`, where :math:`M` is the number of triangles / faces.
device (str): The device to use for the mesh.
Returns:
wp.Mesh: The warp mesh object.
"""
# create warp mesh
return wp.Mesh(
points=wp.array(points.astype(np.float32), dtype=wp.vec3, device=device),
indices=wp.array(indices.astype(np.int32).flatten(), dtype=wp.int32, device=device),
)
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Custom kernels for warp."""
import warp as wp
@wp.kernel
def raycast_mesh_kernel(
mesh: wp.uint64,
ray_starts: wp.array(dtype=wp.vec3),
ray_directions: wp.array(dtype=wp.vec3),
ray_hits: wp.array(dtype=wp.vec3),
):
"""Performs ray-casting against a mesh.
This function performs ray-casting against the given mesh using the provided ray start positions
and directions. The resulting ray hit positions are stored in the :obj:`ray_hits` array.
Note that the `ray_starts`, `ray_directions`, and `ray_hits` arrays should have compatible shapes
and data types to ensure proper execution. Additionally, they all must be in the same frame.
The function utilizes the `mesh_query_ray` method from the `wp` module to perform the actual ray-casting
operation. The maximum ray-cast distance is set to `1e6` units.
Args:
mesh (wp.uint64): The input mesh identifier.
ray_starts (wp.array): The input ray start positions. Shape (N, 3).
ray_directions (wp.array): The input ray directions. Shape (N, 3).
ray_hits (wp.array): The output ray hit positions. Shape (N, 3).
"""
# get the thread id
tid = wp.tid()
max_dist = float(1e6) # max ray-cast distance
t = float(0.0) # hit distance along ray
u = float(0.0) # hit face barycentric u
v = float(0.0) # hit face barycentric v
sign = float(0.0) # hit face sign
n = wp.vec3() # hit face normal
f = int(0) # hit face index
# ray cast against the mesh and store the hit position
if wp.mesh_query_ray(mesh, ray_starts[tid], ray_directions[tid], max_dist, t, u, v, sign, n, f):
ray_hits[tid] = ray_starts[tid] + t * ray_directions[tid]
......@@ -21,6 +21,7 @@ INSTALL_REQUIRES = [
"numpy",
"torch",
"prettytable==3.3.0",
"tensordict",
# devices
"hidapi",
# procedural-generation
......
......@@ -37,7 +37,7 @@ from omni.isaac.core.utils.viewports import set_camera_view
from pxr import Gf, UsdGeom
import omni.isaac.orbit.utils.kit as kit_utils
from omni.isaac.orbit.sensors.camera import Camera, PinholeCameraCfg
from omni.isaac.orbit.compat.sensors.camera import Camera, PinholeCameraCfg
from omni.isaac.orbit.utils.math import convert_quat
from omni.isaac.orbit.utils.timer import Timer
......
......@@ -34,8 +34,8 @@ from omni.isaac.core.simulation_context import SimulationContext
from omni.isaac.core.utils.viewports import set_camera_view
import omni.isaac.orbit.utils.kit as kit_utils
from omni.isaac.orbit.sensors.height_scanner import HeightScanner, HeightScannerCfg
from omni.isaac.orbit.sensors.height_scanner.utils import create_points_from_grid, plot_height_grid
from omni.isaac.orbit.compat.sensors.height_scanner import HeightScanner, HeightScannerCfg
from omni.isaac.orbit.compat.sensors.height_scanner.utils import create_points_from_grid, plot_height_grid
from omni.isaac.orbit.utils.math import convert_quat
from omni.isaac.orbit.utils.timer import Timer
......
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
This script shows how to use the camera sensor from the Orbit framework.
The camera sensor is created and interfaced through the Omniverse Replicator API. However, instead of using
the simulator or OpenGL convention for the camera, we use the robotics or ROS convention.
"""
"""Launch Isaac Sim Simulator first."""
import argparse
# omni-isaac-orbit
from omni.isaac.orbit.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser("Welcome to Orbit: Omniverse Robotics Environments!")
parser.add_argument("--gpu", action="store_true", default=False, help="Use GPU device for camera rendering output.")
args_cli = parser.parse_args()
# launch omniverse app
app_launcher = AppLauncher(headless=True)
simulation_app = app_launcher.app
"""Rest everything follows."""
import numpy as np
import os
import random
import torch
import omni.isaac.core.utils.prims as prim_utils
import omni.isaac.debug_draw._debug_draw as omni_debug_draw
import omni.replicator.core as rep
from omni.isaac.core.prims import RigidPrim
from omni.isaac.core.simulation_context import SimulationContext
from omni.isaac.core.utils.viewports import set_camera_view
from pxr import Gf, UsdGeom
import omni.isaac.orbit.utils.kit as kit_utils
from omni.isaac.orbit.sensors.camera import Camera, PinholeCameraCfg
from omni.isaac.orbit.utils import convert_dict_to_backend
"""
Helpers
"""
def design_scene():
"""Add prims to the scene."""
# Ground-plane
kit_utils.create_ground_plane("/World/defaultGroundPlane")
# Lights-1
prim_utils.create_prim(
"/World/Light/GreySphere",
"SphereLight",
translation=(4.5, 3.5, 10.0),
attributes={"radius": 2.5, "intensity": 600.0, "color": (0.75, 0.75, 0.75)},
)
# Lights-2
prim_utils.create_prim(
"/World/Light/WhiteSphere",
"SphereLight",
translation=(-4.5, 3.5, 10.0),
attributes={"radius": 2.5, "intensity": 600.0, "color": (1.0, 1.0, 1.0)},
)
# Xform to hold objects
prim_utils.create_prim("/World/Objects", "Xform")
# Random objects
for i in range(8):
# sample random position
position = np.random.rand(3) - np.asarray([0.05, 0.05, -1.0])
position *= np.asarray([1.5, 1.5, 0.5])
# create prim
prim_type = random.choice(["Cube", "Sphere", "Cylinder"])
_ = prim_utils.create_prim(
f"/World/Objects/Obj_{i:02d}",
prim_type,
translation=position,
scale=(0.25, 0.25, 0.25),
semantic_label=prim_type,
)
# add rigid properties
rigid_obj = RigidPrim(f"/World/Objects/Obj_{i:02d}", mass=5.0)
# cast to geom prim
geom_prim = getattr(UsdGeom, prim_type)(rigid_obj.prim)
# set random color
color = Gf.Vec3f(random.random(), random.random(), random.random())
geom_prim.CreateDisplayColorAttr()
geom_prim.GetDisplayColorAttr().Set([color])
"""
Main
"""
def main():
"""Runs a camera sensor from orbit."""
# Load kit helper
sim = SimulationContext(
physics_dt=0.005, rendering_dt=0.005, backend="torch", device="cuda" if args_cli.gpu else "cpu"
)
# sim = SimulationContext(physics_dt=0.005, rendering_dt=0.005, backend="numpy")
# Set main camera
set_camera_view([2.5, 2.5, 2.5], [0.0, 0.0, 0.0])
# Acquire draw interface
draw_interface = omni_debug_draw.acquire_debug_draw_interface()
# Populate scene
design_scene()
# Setup camera sensor
camera_cfg = PinholeCameraCfg(
update_freq=0,
height=480,
width=640,
data_types=["rgb", "distance_to_image_plane", "normals", "motion_vectors", "semantic_segmentation"],
usd_params=PinholeCameraCfg.UsdCameraCfg(
focal_length=24.0, focus_distance=400.0, horizontal_aperture=20.955, clipping_range=(0.1, 1.0e5)
),
)
camera = Camera(cfg=camera_cfg)
# Spawn camera
camera.spawn("/World/CameraSensor/Cam_00")
camera.spawn("/World/CameraSensor/Cam_01")
# Create replicator writer
output_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "output", "camera")
rep_writer = rep.BasicWriter(output_dir=output_dir, frame_padding=3)
# Play simulator
sim.play()
# Initialize camera
camera.initialize("/World/CameraSensor/Cam_*")
# Set pose: There are two ways to set the pose of the camera.
# -- Option-1: Set pose using view
eyes = [[2.5, 2.5, 2.5], [-2.5, -2.5, 2.5]]
targets = [[0.0, 0.0, 0.0], [0.0, 0.0, 0.0]]
camera.set_world_poses_from_view(eyes, targets)
# -- Option-2: Set pose using ROS
# position = [[2.5, 2.5, 2.5]]
# orientation = [[-0.17591989, 0.33985114, 0.82047325, -0.42470819]]
# camera.set_world_pose_ros(position, orientation, indices=[0])
# Simulate for a few steps
# note: This is a workaround to ensure that the textures are loaded.
# Check "Known Issues" section in the documentation for more details.
for _ in range(3):
sim.step()
# Simulate physics
for _ in range(100):
# If simulation is stopped, then exit.
if sim.is_stopped():
break
# If simulation is paused, then skip.
if not sim.is_playing():
sim.step(render=app_launcher.RENDER)
continue
# Step simulation
sim.step(render=app_launcher.RENDER)
# Update camera data
camera.update_buffers(dt=0.0)
# Print camera info
print(camera)
print("Received shape of rgb image: ", camera.data.output["rgb"].shape)
print("Received shape of depth image: ", camera.data.output["distance_to_image_plane"].shape)
print("-------------------------------")
# Extract camera data
camera_index = 1
# note: BasicWriter only supports saving data in numpy format, so we need to convert the data to numpy.
if sim.backend == "torch":
# tensordict allows easy indexing of tensors in the dictionary
single_cam_data = convert_dict_to_backend(camera.data.output[camera_index], backend="numpy")
else:
# for numpy, we need to manually index the data
single_cam_data = dict()
for key, value in camera.data.output.items():
single_cam_data[key] = value[camera_index]
# Extract the other information
single_cam_info = camera.data.info[camera_index]
# Pack data back into replicator format to save them using its writer
rep_output = dict()
for key, data, info in zip(single_cam_data.keys(), single_cam_data.values(), single_cam_info.values()):
if info is not None:
rep_output[key] = {"data": data, "info": info}
else:
rep_output[key] = data
# Save images
rep_output["trigger_outputs"] = {"on_time": camera.frame[camera_index]}
rep_writer.write(rep_output)
if __name__ == "__main__":
# Runs the main function
main()
# Close the simulator
simulation_app.close()
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
This script demonstrates how to use the contact sensor sensor in Orbit.
.. code-block:: bash
./orbit.sh -p source/extensions/omni.isaac.orbit/test/sensors/test_contact_sensor.py --num_robots 2
"""
"""Launch Isaac Sim Simulator first."""
import argparse
from omni.isaac.kit import SimulationApp
# add argparse arguments
parser = argparse.ArgumentParser("Welcome to Orbit: Omniverse Robotics Environments!")
parser.add_argument("--headless", action="store_true", default=False, help="Force display off at all times.")
parser.add_argument("--num_robots", type=int, default=128, help="Number of robots to spawn.")
args_cli = parser.parse_args()
# launch omniverse app
config = {"headless": args_cli.headless}
simulation_app = SimulationApp(config)
"""Rest everything follows."""
import torch
import carb
import omni.isaac.core.utils.prims as prim_utils
from omni.isaac.cloner import GridCloner
from omni.isaac.core.simulation_context import SimulationContext
from omni.isaac.core.utils.carb import set_carb_setting
from omni.isaac.core.utils.viewports import set_camera_view
import omni.isaac.orbit.utils.kit as kit_utils
from omni.isaac.orbit.robots.config.anymal import ANYMAL_C_CFG
from omni.isaac.orbit.robots.legged_robot import LeggedRobot
from omni.isaac.orbit.sensors.contact_sensor import ContactSensor, ContactSensorCfg
"""
Helpers
"""
def design_scene():
"""Add prims to the scene."""
# Ground-plane
kit_utils.create_ground_plane("/World/defaultGroundPlane")
# Lights-1
prim_utils.create_prim(
"/World/Light/GreySphere",
"SphereLight",
translation=(4.5, 3.5, 10.0),
attributes={"radius": 2.5, "intensity": 600.0, "color": (0.75, 0.75, 0.75)},
)
# Lights-2
prim_utils.create_prim(
"/World/Light/WhiteSphere",
"SphereLight",
translation=(-4.5, 3.5, 10.0),
attributes={"radius": 2.5, "intensity": 600.0, "color": (1.0, 1.0, 1.0)},
)
"""
Main
"""
def main():
"""Spawns the ANYmal robot and clones it using Isaac Sim Cloner API."""
# Load kit helper
sim = SimulationContext(physics_dt=0.005, rendering_dt=0.005, backend="torch", device="cuda:0")
# Set main camera
set_camera_view([2.5, 2.5, 2.5], [0.0, 0.0, 0.0])
# Enable flatcache which avoids passing data over to USD structure
# this speeds up the read-write operation of GPU buffers
if sim.get_physics_context().use_gpu_pipeline:
sim.get_physics_context().enable_flatcache(True)
# Enable hydra scene-graph instancing
# this is needed to visualize the scene when flatcache is enabled
set_carb_setting(sim._settings, "/persistent/omnihydra/useSceneGraphInstancing", True)
# Create interface to clone the scene
cloner = GridCloner(spacing=2.0)
cloner.define_base_env("/World/envs")
# Everything under the namespace "/World/envs/env_0" will be cloned
prim_utils.define_prim("/World/envs/env_0")
# Spawn things into the scene
robot = LeggedRobot(cfg=ANYMAL_C_CFG)
robot.spawn("/World/envs/env_0/Robot")
# Contact sensor
contact_sensor_cfg = ContactSensorCfg(debug_vis=True)
contact_sensor = ContactSensor(cfg=contact_sensor_cfg)
contact_sensor.spawn("/World/envs/env_0/Robot/.*_SHANK")
# design props
design_scene()
# Clone the scene
num_envs = args_cli.num_robots
cloner.define_base_env("/World/envs")
envs_prim_paths = cloner.generate_paths("/World/envs/env", num_paths=num_envs)
envs_positions = cloner.clone(
source_prim_path="/World/envs/env_0", prim_paths=envs_prim_paths, replicate_physics=True
)
# convert environment positions to torch tensor
envs_positions = torch.tensor(envs_positions, dtype=torch.float, device=sim.device)
# filter collisions within each environment instance
physics_scene_path = sim.get_physics_context().prim_path
cloner.filter_collisions(
physics_scene_path, "/World/collisions", envs_prim_paths, global_paths=["/World/defaultGroundPlane"]
)
# Play the simulator
sim.reset()
# Acquire handles
# Initialize handles
robot.initialize("/World/envs/env_.*/Robot")
contact_sensor.initialize("/World/envs/env_.*/Robot/.*_SHANK")
# Now we are ready!
print("[INFO]: Setup complete...")
# dummy actions
actions = torch.zeros(robot.count, robot.num_actions, device=robot.device)
# Define simulation stepping
decimation = 4
sim_dt = decimation * sim.get_physics_dt()
sim_time = 0.0
count = 0
# Simulate physics
while simulation_app.is_running():
# If simulation is stopped, then exit.
if sim.is_stopped():
break
# If simulation is paused, then skip.
if not sim.is_playing():
sim.step(render=not args_cli.headless)
continue
# reset
if count % 1000 == 0:
# reset counters
sim_time = 0.0
count = 0
# reset dof state
dof_pos, dof_vel = robot.get_default_dof_state()
robot.set_dof_state(dof_pos, dof_vel)
robot.reset_buffers()
# reset command
actions = torch.zeros(robot.count, robot.num_actions, device=robot.device)
# perform 4 steps
for _ in range(decimation):
# apply actions
robot.apply_action(actions)
# perform step
sim.step(render=not args_cli.headless)
# update sim-time
sim_time += sim_dt
count += 1
# update the buffers
if sim.is_playing():
robot.update_buffers(sim_dt)
contact_sensor.update_buffers(sim_dt)
# update marker visualization
if not args_cli.headless:
contact_sensor.debug_vis()
if __name__ == "__main__":
# Run the main function
main()
# Close the simulator
simulation_app.close()
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
This script shows how to use the ray caster from the Orbit framework.
.. code-block:: bash
# Usage
./orbit.sh -p source/extensions/omni.isaac.orbit/test/sensors/test_ray_caster.py --headless
"""
"""Launch Isaac Sim Simulator first."""
import argparse
# omni-isaac-orbit
from omni.isaac.kit import SimulationApp
# add argparse arguments
parser = argparse.ArgumentParser("Welcome to Orbit: Omniverse Robotics Environments!")
parser.add_argument("--headless", action="store_true", default=False, help="Force display off at all times.")
parser.add_argument("--num_envs", type=int, default=128, help="Number of environments to clone.")
parser.add_argument(
"--terrain_type",
type=str,
default="generated",
help="Type of terrain to import. Can be 'generated' or 'usd' or 'plane'.",
)
args_cli = parser.parse_args()
# launch omniverse app
config = {"headless": args_cli.headless}
simulation_app = SimulationApp(config)
"""Rest everything follows."""
import numpy as np
import omni.isaac.core.utils.prims as prim_utils
from omni.isaac.cloner import GridCloner
from omni.isaac.core.objects import DynamicSphere
from omni.isaac.core.prims import RigidPrimView
from omni.isaac.core.simulation_context import SimulationContext
from omni.isaac.core.utils.viewports import set_camera_view
import omni.isaac.orbit.terrains as terrain_gen
import omni.isaac.orbit.utils.kit as kit_utils
from omni.isaac.orbit.sensors.ray_caster import GridPatternCfg, RayCaster, RayCasterCfg
from omni.isaac.orbit.terrains.config.rough import ASSORTED_TERRAINS_CFG
from omni.isaac.orbit.terrains.terrain_importer import TerrainImporter
from omni.isaac.orbit.utils.assets import ISAAC_NUCLEUS_DIR
from omni.isaac.orbit.utils.timer import Timer
def design_scene(sim: SimulationContext, num_envs: int = 2048):
"""Design the scene."""
# Create interface to clone the scene
cloner = GridCloner(spacing=2.0)
cloner.define_base_env("/World/envs")
# Everything under the namespace "/World/envs/env_0" will be cloned
prim_utils.define_prim("/World/envs/env_0")
# Define the scene
# -- Light
prim_utils.create_prim(
"/World/sphereLight",
"SphereLight",
translation=(0.0, 0.0, 500.0),
attributes={"radius": 100.0, "intensity": 50000.0, "color": (0.75, 0.75, 0.75)},
)
# -- Balls
# -- Ball physics
DynamicSphere(
prim_path="/World/envs/env_0/ball",
translation=np.array([0.0, 0.0, 5.0]),
mass=0.5,
radius=0.25,
color=np.asarray((0.0, 0.0, 1.0)),
)
# Clone the scene
cloner.define_base_env("/World/envs")
envs_prim_paths = cloner.generate_paths("/World/envs/env", num_paths=num_envs)
cloner.clone(source_prim_path="/World/envs/env_0", prim_paths=envs_prim_paths, replicate_physics=True)
physics_scene_path = sim.get_physics_context().prim_path
cloner.filter_collisions(
physics_scene_path, "/World/collisions", prim_paths=envs_prim_paths, global_paths=["/World/ground"]
)
def main():
"""Main function."""
# Load kit helper
sim_params = {
"use_gpu": True,
"use_gpu_pipeline": True,
"use_flatcache": True,
"enable_scene_query_support": True,
}
sim = SimulationContext(
physics_dt=1.0 / 60.0, rendering_dt=1.0 / 60.0, sim_params=sim_params, backend="torch", device="cuda:0"
)
# Set main camera
set_camera_view([0.0, 30.0, 25.0], [0.0, 0.0, -2.5])
# Parameters
num_envs = args_cli.num_envs
# Design the scene
design_scene(sim=sim, num_envs=num_envs)
# Handler for terrains importing
if args_cli.terrain_type == "generated":
terrain_importer_cfg = terrain_gen.TerrainImporterCfg(prim_path="/World/ground", max_init_terrain_level=None)
terrain_importer = TerrainImporter(terrain_importer_cfg, device=sim.device)
terrain_generator = terrain_gen.TerrainGenerator(cfg=ASSORTED_TERRAINS_CFG, curriculum=True)
terrain_importer.import_mesh(terrain_generator.terrain_mesh, key="rough")
elif args_cli.terrain_type == "usd":
prim_utils.create_prim("/World/ground", usd_path=f"{ISAAC_NUCLEUS_DIR}/Environments/Terrains/rough_plane.usd")
elif args_cli.terrain_type == "plane":
kit_utils.create_ground_plane("/World/ground")
else:
raise NotImplementedError(f"Terrain type {args_cli.terrain_type} not supported!")
# Create a ray-caster sensor
pattern_cfg = GridPatternCfg(resolution=0.1, size=(1.6, 1.0))
ray_caster_cfg = RayCasterCfg(
mesh_prim_paths=["/World/ground"], pattern_cfg=pattern_cfg, attach_yaw_only=True, debug_vis=True
)
ray_caster = RayCaster(cfg=ray_caster_cfg)
# Create a view over all the balls
ball_view = RigidPrimView("/World/envs/env_.*/ball", reset_xform_properties=False)
# Play simulator
sim.reset()
# Initialize the views
# -- balls
ball_view.initialize()
# -- sensors
ray_caster.initialize("/World/envs/env_.*/ball")
# Print the sensor information
print(ray_caster)
# Get the initial positions of the balls
ball_initial_poses = ball_view.get_world_poses()
ball_initial_velocities = ball_view.get_velocities()
# Create a counter for resetting the scene
step_count = 0
# Simulate physics
while simulation_app.is_running():
# If simulation is stopped, then exit.
if sim.is_stopped():
break
# If simulation is paused, then skip.
if not sim.is_playing():
sim.step(render=not args_cli.headless)
continue
# Reset the scene
if step_count % 500 == 0:
# reset the balls
ball_view.set_world_poses(*ball_initial_poses)
ball_view.set_velocities(ball_initial_velocities)
# reset the sensor
ray_caster.reset_buffers()
# reset the counter
step_count = 0
# Step simulation
sim.step()
# Update the ray-caster
with Timer(f"Ray-caster update with {ray_caster.count} x {ray_caster.num_rays} rays"):
ray_caster.update_buffers(dt=sim.get_physics_dt())
# Visualize the ray-caster
if not args_cli.headless:
with Timer(f"Ray-caster debug visualization\t\t"):
ray_caster.debug_vis()
# Update counter
step_count += 1
if __name__ == "__main__":
# Runs the main function
main()
# Close the simulator
simulation_app.close()
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
This script shows how to use the camera sensor from the Orbit framework.
The camera sensor is created and interfaced through the Omniverse Replicator API. However, instead of using
the simulator or OpenGL convention for the camera, we use the robotics or ROS convention.
"""
"""Launch Isaac Sim Simulator first."""
import argparse
# omni-isaac-orbit
from omni.isaac.kit import SimulationApp
# add argparse arguments
parser = argparse.ArgumentParser("Welcome to Orbit: Omniverse Robotics Environments!")
parser.add_argument("--headless", action="store_true", default=False, help="Force display off at all times.")
parser.add_argument("--gpu", action="store_true", default=False, help="Use GPU device for camera rendering output.")
parser.add_argument("--draw", action="store_true", default=False, help="Draw the obtained pointcloud on viewport.")
args_cli = parser.parse_args()
# launch omniverse app
config = {"headless": args_cli.headless}
simulation_app = SimulationApp(config)
"""Rest everything follows."""
import numpy as np
import os
import random
import omni.isaac.core.utils.prims as prim_utils
import omni.isaac.debug_draw._debug_draw as omni_debug_draw
import omni.replicator.core as rep
from omni.isaac.core.prims import RigidPrim
from omni.isaac.core.simulation_context import SimulationContext
from omni.isaac.core.utils.viewports import set_camera_view
from pxr import Gf, UsdGeom
import omni.isaac.orbit.utils.kit as kit_utils
from omni.isaac.orbit.compat.sensors.camera import Camera, PinholeCameraCfg
from omni.isaac.orbit.sensors.camera.utils import create_pointcloud_from_rgbd
from omni.isaac.orbit.utils import convert_dict_to_backend
"""
Helpers
"""
def design_scene():
"""Add prims to the scene."""
# Ground-plane
kit_utils.create_ground_plane("/World/defaultGroundPlane")
# Lights-1
prim_utils.create_prim(
"/World/Light/GreySphere",
"SphereLight",
translation=(4.5, 3.5, 10.0),
attributes={"radius": 2.5, "intensity": 600.0, "color": (0.75, 0.75, 0.75)},
)
# Lights-2
prim_utils.create_prim(
"/World/Light/WhiteSphere",
"SphereLight",
translation=(-4.5, 3.5, 10.0),
attributes={"radius": 2.5, "intensity": 600.0, "color": (1.0, 1.0, 1.0)},
)
# Xform to hold objects
prim_utils.create_prim("/World/Objects", "Xform")
# Random objects
for i in range(8):
# sample random position
position = np.random.rand(3) - np.asarray([0.05, 0.05, -1.0])
position *= np.asarray([1.5, 1.5, 0.5])
# create prim
prim_type = random.choice(["Cube", "Sphere", "Cylinder"])
_ = prim_utils.create_prim(
f"/World/Objects/Obj_{i:02d}",
prim_type,
translation=position,
scale=(0.25, 0.25, 0.25),
semantic_label=prim_type,
)
# add rigid properties
rigid_obj = RigidPrim(f"/World/Objects/Obj_{i:02d}", mass=5.0)
# cast to geom prim
geom_prim = getattr(UsdGeom, prim_type)(rigid_obj.prim)
# set random color
color = Gf.Vec3f(random.random(), random.random(), random.random())
geom_prim.CreateDisplayColorAttr()
geom_prim.GetDisplayColorAttr().Set([color])
"""
Main
"""
def main():
"""Runs a camera sensor from orbit."""
# Load kit helper
sim = SimulationContext(physics_dt=0.005, rendering_dt=0.005, backend="torch")
# Set main camera
set_camera_view([2.5, 2.5, 2.5], [0.0, 0.0, 0.0])
# Acquire draw interface
draw_interface = omni_debug_draw.acquire_debug_draw_interface()
# Populate scene
design_scene()
# Setup camera sensor
camera_cfg = PinholeCameraCfg(
sensor_tick=0,
height=480,
width=640,
data_types=["rgb", "distance_to_image_plane", "normals", "motion_vectors"],
usd_params=PinholeCameraCfg.UsdCameraCfg(
focal_length=24.0, focus_distance=400.0, horizontal_aperture=20.955, clipping_range=(0.1, 1.0e5)
),
)
camera = Camera(cfg=camera_cfg, device="cuda" if args_cli.gpu else "cpu")
# Spawn camera
camera.spawn("/World/CameraSensor")
# Create replicator writer
output_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "output", "camera")
rep_writer = rep.BasicWriter(output_dir=output_dir, frame_padding=3)
# Play simulator
sim.play()
# Initialize camera
camera.initialize()
# Set pose: There are two ways to set the pose of the camera.
# -- Option-1: Set pose using view
# camera.set_world_pose_from_view(eye=[2.5, 2.5, 2.5], target=[0.0, 0.0, 0.0])
# -- Option-2: Set pose using ROS
position = [2.5, 2.5, 2.5]
orientation = [-0.17591989, 0.33985114, 0.82047325, -0.42470819]
camera.set_world_pose_ros(position, orientation)
# Simulate for a few steps
# note: This is a workaround to ensure that the textures are loaded.
# Check "Known Issues" section in the documentation for more details.
for _ in range(3):
sim.step()
# Simulate physics
while simulation_app.is_running():
# If simulation is stopped, then exit.
if sim.is_stopped():
break
# If simulation is paused, then skip.
if not sim.is_playing():
sim.step(render=not args_cli.headless)
continue
# Step simulation
sim.step()
# Update camera data
camera.update(dt=0.0)
# Print camera info
print(camera)
print("Received shape of rgb image: ", camera.data.output["rgb"].shape)
print("Received shape of depth image: ", camera.data.output["distance_to_image_plane"].shape)
print("-------------------------------")
# Save images
# note: BasicWriter only supports saving data in numpy format
rep_writer.write(convert_dict_to_backend(camera.data.output, backend="numpy"))
# Pointcloud in world frame
pointcloud_w, pointcloud_rgb = create_pointcloud_from_rgbd(
camera.data.intrinsic_matrix,
depth=camera.data.output["distance_to_image_plane"],
rgb=camera.data.output["rgb"],
position=camera.data.position,
orientation=camera.data.orientation,
normalize_rgb=True,
num_channels=4,
)
# Draw pointcloud
if not args_cli.headless and args_cli.draw:
# Convert to numpy for visualization
if not isinstance(pointcloud_w, np.ndarray):
pointcloud_w = pointcloud_w.cpu().numpy()
if not isinstance(pointcloud_rgb, np.ndarray):
pointcloud_rgb = pointcloud_rgb.cpu().numpy()
# Visualize the points
num_points = pointcloud_w.shape[0]
points_size = [1.25] * num_points
points_color = pointcloud_rgb
draw_interface.clear_points()
draw_interface.draw_points(pointcloud_w.tolist(), points_color, points_size)
if __name__ == "__main__":
# Runs the main function
main()
# Close the simulator
simulation_app.close()
......@@ -33,6 +33,7 @@ simulation_app = app_launcher.app
import numpy as np
import os
import random
import torch
import omni.isaac.core.utils.prims as prim_utils
import omni.isaac.debug_draw._debug_draw as omni_debug_draw
......@@ -44,8 +45,8 @@ from pxr import Gf, UsdGeom
import omni.isaac.orbit.utils.kit as kit_utils
from omni.isaac.orbit.sensors.camera import Camera, PinholeCameraCfg
from omni.isaac.orbit.sensors.camera.utils import create_pointcloud_from_rgbd
from omni.isaac.orbit.utils import convert_dict_to_backend
from omni.isaac.orbit.utils.math import project_points, transform_points, unproject_depth
"""
Helpers
......@@ -105,7 +106,10 @@ def main():
"""Runs a camera sensor from orbit."""
# Load kit helper
sim = SimulationContext(physics_dt=0.005, rendering_dt=0.005, backend="torch")
sim = SimulationContext(
physics_dt=0.005, rendering_dt=0.005, backend="torch", device="cuda" if args_cli.gpu else "cpu"
)
# sim = SimulationContext(physics_dt=0.005, rendering_dt=0.005, backend="numpy")
# Set main camera
set_camera_view([2.5, 2.5, 2.5], [0.0, 0.0, 0.0])
# Acquire draw interface
......@@ -115,18 +119,19 @@ def main():
design_scene()
# Setup camera sensor
camera_cfg = PinholeCameraCfg(
sensor_tick=0,
update_freq=0,
height=480,
width=640,
data_types=["rgb", "distance_to_image_plane", "normals", "motion_vectors"],
data_types=["rgb", "distance_to_image_plane", "normals", "motion_vectors", "semantic_segmentation"],
usd_params=PinholeCameraCfg.UsdCameraCfg(
focal_length=24.0, focus_distance=400.0, horizontal_aperture=20.955, clipping_range=(0.1, 1.0e5)
),
)
camera = Camera(cfg=camera_cfg, device="cuda" if args_cli.gpu else "cpu")
camera = Camera(cfg=camera_cfg)
# Spawn camera
camera.spawn("/World/CameraSensor")
camera.spawn("/World/CameraSensor/Cam_00")
camera.spawn("/World/CameraSensor/Cam_01")
# Create replicator writer
output_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "output", "camera")
......@@ -135,21 +140,23 @@ def main():
# Play simulator
sim.play()
# Initialize camera
camera.initialize()
camera.initialize("/World/CameraSensor/Cam_*")
# Set pose: There are two ways to set the pose of the camera.
# -- Option-1: Set pose using view
# camera.set_world_pose_from_view(eye=[2.5, 2.5, 2.5], target=[0.0, 0.0, 0.0])
eyes = [[2.5, 2.5, 2.5], [-2.5, -2.5, 2.5]]
targets = [[0.0, 0.0, 0.0], [0.0, 0.0, 0.0]]
camera.set_world_poses_from_view(eyes, targets)
# -- Option-2: Set pose using ROS
position = [2.5, 2.5, 2.5]
orientation = [-0.17591989, 0.33985114, 0.82047325, -0.42470819]
camera.set_world_pose_ros(position, orientation)
# position = [[2.5, 2.5, 2.5]]
# orientation = [[-0.17591989, 0.33985114, 0.82047325, -0.42470819]]
# camera.set_world_pose_ros(position, orientation, indices=[0])
# Simulate for a few steps
# note: This is a workaround to ensure that the textures are loaded.
# Check "Known Issues" section in the documentation for more details.
for _ in range(14):
sim.render()
for _ in range(3):
sim.step()
# Simulate physics
while simulation_app.is_running():
......@@ -163,7 +170,7 @@ def main():
# Step simulation
sim.step(render=app_launcher.RENDER)
# Update camera data
camera.update(dt=0.0)
camera.update_buffers(dt=0.0)
# Print camera info
print(camera)
......@@ -171,34 +178,64 @@ def main():
print("Received shape of depth image: ", camera.data.output["distance_to_image_plane"].shape)
print("-------------------------------")
# Extract camera data
camera_index = 1
# note: BasicWriter only supports saving data in numpy format, so we need to convert the data to numpy.
if sim.backend == "torch":
# tensordict allows easy indexing of tensors in the dictionary
single_cam_data = convert_dict_to_backend(camera.data.output[camera_index], backend="numpy")
else:
# for numpy, we need to manually index the data
single_cam_data = dict()
for key, value in camera.data.output.items():
single_cam_data[key] = value[camera_index]
# Extract the other information
single_cam_info = camera.data.info[camera_index]
# Pack data back into replicator format to save them using its writer
rep_output = dict()
for key, data, info in zip(single_cam_data.keys(), single_cam_data.values(), single_cam_info.values()):
if info is not None:
rep_output[key] = {"data": data, "info": info}
else:
rep_output[key] = data
# Save images
# note: BasicWriter only supports saving data in numpy format
rep_writer.write(convert_dict_to_backend(camera.data.output, backend="numpy"))
rep_output["trigger_outputs"] = {"on_time": camera.frame[camera_index]}
rep_writer.write(rep_output)
# Pointcloud in world frame
pointcloud_w, pointcloud_rgb = create_pointcloud_from_rgbd(
camera.data.intrinsic_matrix,
depth=camera.data.output["distance_to_image_plane"],
rgb=camera.data.output["rgb"],
position=camera.data.position,
orientation=camera.data.orientation,
normalize_rgb=True,
num_channels=4,
)
points_3d_cam = unproject_depth(camera.data.output["distance_to_image_plane"], camera.data.intrinsic_matrices)
points_3d_world = transform_points(points_3d_cam, camera.data.position, camera.data.orientation)
# Check methods are valid
im_height, im_width = camera.image_shape
# -- project points to (u, v, d)
reproj_points = project_points(points_3d_cam, camera.data.intrinsic_matrices)
reproj_depths = reproj_points[..., -1].view(-1, im_width, im_height).transpose_(1, 2)
sim_depths = camera.data.output["distance_to_image_plane"].squeeze(-1)
torch.testing.assert_allclose(reproj_depths, sim_depths)
# Draw pointcloud
if not args_cli.headless and args_cli.draw:
# Convert to numpy for visualization
if not isinstance(pointcloud_w, np.ndarray):
pointcloud_w = pointcloud_w.cpu().numpy()
if not isinstance(pointcloud_rgb, np.ndarray):
pointcloud_rgb = pointcloud_rgb.cpu().numpy()
# Visualize the points
num_points = pointcloud_w.shape[0]
points_size = [1.25] * num_points
points_color = pointcloud_rgb
if not isinstance(points_3d_world, np.ndarray):
points_3d_world = points_3d_world.cpu().numpy()
# Clear any existing points
draw_interface.clear_points()
draw_interface.draw_points(pointcloud_w.tolist(), points_color, points_size)
# Obtain drawing settings
num_batch = points_3d_world.shape[0]
num_points = points_3d_world.shape[1]
points_size = [1.25] * num_points
# Fix random seed
random.seed(0)
# Visualize the points
for index in range(num_batch):
# generate random color
color = [random.random() for _ in range(3)]
color += [1.0]
# plain color for points
points_color = [color] * num_points
draw_interface.draw_points(points_3d_world[index].tolist(), points_color, points_size)
if __name__ == "__main__":
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment