Unverified Commit 199a1444 authored by renezurbruegg's avatar renezurbruegg Committed by GitHub

Supports warp backend for camera unprojection operations (#2)

* implements unprojection operations in torch
* adds GPU backend support in play_camera.py
* adds play-camera to docs
* adds utilities methods to convert any array type to different backends
* adds utilities to convert arrays/tensors in a nested dictionary to a specified backend
* updates changelog and extension.toml
Co-authored-by: 's avatarMayank Mittal <mittalma@leggedrobotics.com>
parent 4f680194
......@@ -96,6 +96,7 @@ autodoc_mock_imports = [
"numpy",
"scipy",
"carb",
"warp",
"pxr",
"omni.kit",
"omni.usd",
......
......@@ -31,6 +31,15 @@ A few quick demo scripts to run and checkout:
./orbit.sh -p source/standalone/demo/play_ik_control.py --robot franka_panda --num_envs 128
- Spawn a camera and visualize the obtained pointcloud:
.. code:: bash
# CPU
./orbit.sh -p source/standalone/demo/play_camera.py
# GPU
./orbit.sh -p source/standalone/demo/play_camera.py --gpu
Environments
------------
......
[package]
# Note: Semantic Versioning is used: https://semver.org/
version = "0.1.1"
version = "0.2.0"
# Description
title = "ORBIT framework for Robot Learning"
......
Changelog
---------
0.2.0 (2023-01-25)
~~~~~~~~~~~~~~~~~~
Added
^^^^^
* Added support for warp backend in camera utilities.
* Extended the ``play_camera.py`` with ``--gpu`` flag to use GPU replicator backend.
0.1.1 (2023-01-24)
~~~~~~~~~~~~~~~~~~
......
......@@ -2,172 +2,207 @@
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Helper functions to project between pointcloud and depth images."""
import numpy as np
import scipy.spatial.transform as tf
import torch
from typing import Sequence, Tuple, Union
from typing import Optional, Sequence, Tuple, Union
import warp as wp
from omni.isaac.orbit.utils.array import convert_to_torch
from omni.isaac.orbit.utils.math import matrix_from_quat
__all__ = ["transform_points", "create_pointcloud_from_depth", "create_pointcloud_from_rgbd"]
from omni.isaac.orbit.utils.math import convert_quat
"""
Depth <-> Pointcloud conversions.
"""
def transform_pointcloud(
points: Union[np.ndarray, torch.Tensor], position: Sequence[float] = None, orientation: Sequence[float] = None
def transform_points(
points: Union[np.ndarray, torch.Tensor, wp.array],
position: Optional[Sequence[float]] = None,
orientation: Optional[Sequence[float]] = None,
device: Union[torch.device, str, None] = None,
) -> Union[np.ndarray, torch.Tensor]:
"""Transform input points in a given frame to a target frame.
This function uses torch operations to transform points from a source frame to a target frame. The
transformation is defined by the position ``t`` and orientation ``R`` of the target frame in the source frame.
.. math::
p_{target} = R_{target} \\times p_{source} + t_{target}
If either the inputs `position` and `orientation` are :obj:`None`, the corresponding transformation is not applied.
Args:
points (Union[np.ndarray, torch.Tensor]): An array of shape (N, 3) comprising of 3D points in source frame.
position (Sequence[float], optional): The position of source frame in target frame. Defaults to None.
orientation (Sequence[float], optional): The orientation `(w, x, y, z)` of source frame in target frame.
points (Union[np.ndarray, torch.Tensor, wp.array]): An array of shape (N, 3) comprising of 3D points in source frame.
position (Optional[Sequence[float]], optional): The position of source frame in target frame. Defaults to None.
orientation (Optional[Sequence[float]], optional): The orientation ``(w, x, y, z)`` of source frame in target frame.
Defaults to None.
device (Optional[Union[torch.device, str]], optional): The device for torch where the computation
should be executed. Defaults to None, i.e. takes the device that matches the depth image.
Returns:
Union[np.ndarray, torch.Tensor]: An array of shape (N, 3) comprising of 3D points in target frame.
Union[np.ndarray, torch.Tensor]:
A tensor of shape (N, 3) comprising of 3D points in target frame.
If the input is a numpy array, the output is a numpy array. Otherwise, it is a torch tensor.
"""
# device for carrying out conversion
device = "cpu"
is_torch = True
# check if numpy
is_numpy = isinstance(points, np.ndarray)
# decide device
if device is None and is_numpy:
device = torch.device("cpu")
# convert to torch
if not isinstance(points, torch.Tensor):
is_torch = False
points = torch.from_numpy(points).to(device)
# rotate points
points = convert_to_torch(points, dtype=torch.float32, device=device)
# update the device with the device of the depth image
# note: this is needed since warp does not provide the device directly
device = points.device
# apply rotation
if orientation is not None:
# convert to numpy (sanity)
orientation = np.asarray(orientation)
# convert using scipy to simplify life
rot = tf.Rotation.from_quat(convert_quat(orientation, "xyzw"))
rot_matrix = torch.from_numpy(rot.as_matrix()).to(device)
# apply rotation
points = torch.matmul(rot_matrix, points.T).T
# translate points
orientation = convert_to_torch(orientation, dtype=torch.float32, device=device)
# apply translation
if position is not None:
# convert to torch to simplify life
position = torch.from_numpy(position).to(device)
# apply translation
points += position
# return results
if is_torch:
return points
else:
position = convert_to_torch(position, dtype=torch.float32, device=device)
# apply transformation
points = _transform_points_jit(points, position, orientation)
# return everything according to input type
if is_numpy:
return points.detach().cpu().numpy()
else:
return points
def create_pointcloud_from_depth(
intrinsic_matrix: np.ndarray,
depth: np.ndarray,
intrinsic_matrix: Union[np.ndarray, torch.Tensor, wp.array],
depth: Union[np.ndarray, torch.Tensor, wp.array],
keep_invalid: bool = False,
position: Sequence[float] = None,
orientation: Sequence[float] = None,
) -> np.ndarray:
position: Optional[Sequence[float]] = None,
orientation: Optional[Sequence[float]] = None,
device: Optional[Union[torch.device, str]] = None,
) -> Union[np.ndarray, torch.Tensor]:
"""Creates pointcloud from input depth image and camera intrinsic matrix.
If the inputs `camera_position` and `camera_orientation` are provided, the pointcloud is transformed
from the camera frame to the target frame.
This function creates a pointcloud from a depth image and camera intrinsic matrix. The pointcloud is
computed using the following equation:
.. math::
p_{camera} = K^{-1} \\times [u, v, 1]^T \\times d
where :math:`K` is the camera intrinsic matrix, :math:`u` and :math:`v` are the pixel coordinates and
:math:`d` is the depth value at the pixel.
We use PyTorch here for matrix multiplication since it is compiled with Intel MKL while numpy
by default uses OpenBLAS. With PyTorch (CPU), we could process a depth image of size (480, 640)
in 0.0106 secs, while with numpy it took 0.0292 secs.
Additionally, the pointcloud can be transformed from the camera frame to a target frame by providing
the position ``t`` and orientation ``R`` of the camera in the target frame:
.. math::
p_{target} = R_{target} \\times p_{camera} + t_{target}
Args:
intrinsic_matrix (np.ndarray): A (3, 3) numpy array providing camera's calibration matrix.
depth (np.ndarray): An array of shape (H, W) with values encoding the depth measurement.
intrinsic_matrix (Union[np.ndarray, torch.Tensor, wp.array]): A (3, 3) array providing camera's calibration
matrix.
depth (Union[np.ndarray, torch.Tensor, wp.array]): An array of shape (H, W) with values encoding the depth
measurement.
keep_invalid (bool, optional): Whether to keep invalid points in the cloud or not. Invalid points
correspond to pixels with depth values 0.0 or NaN. Defaults to False.
position (Sequence[float], optional): The position of the camera in a target frame. Defaults to None.
orientation (Sequence[float], optional): The orientation `(w, x, y, z)` of the camera in a target frame.
position (Optional[Sequence[float]], optional): The position of the camera in a target frame.
Defaults to None.
orientation (Optional[Sequence[float]], optional): The orientation ``(w, x, y, z)`` of the
camera in a target frame. Defaults to None.
device (Optional[Union[torch.device, str]], optional): The device for torch where the computation
should be executed. Defaults to None, i.e. takes the device that matches the depth image.
Raises:
ValueError: When intrinsic matrix is not of shape (3, 3).
ValueError: When depth image is not of shape (H, W) or (H, W, 1).
Returns:
np.ndarray: An array of shape (N, 3) comprising of 3D coordinates of points.
Union[np.ndarray, torch.Tensor]:
An array/tensor of shape (N, 3) comprising of 3D coordinates of points.
The returned datatype is torch if input depth is of type torch.tensor or wp.array. Otherwise, a np.ndarray
is returned.
"""
# device for carrying out conversion
device = "cpu"
# We use PyTorch here for matrix multiplication since it is compiled with Intel MKL while numpy
# by default uses OpenBLAS. With PyTorch (CPU), we could process a depth image of size (480, 640)
# in 0.0051 secs, while with numpy it took 0.0292 secs.
# convert to numpy matrix
intrinsic_matrix = np.asarray(intrinsic_matrix)
depth = np.asarray(depth).copy()
# squeeze out excess dimension
if len(depth.shape) == 3:
depth = depth.squeeze(axis=2)
# check shape of inputs
if intrinsic_matrix.shape != (3, 3):
raise ValueError(f"Input intrinsic matrix of invalid shape: {intrinsic_matrix.shape} != (3, 3).")
if len(depth.shape) != 2:
raise ValueError(f"Input depth image not two-dimensional. Received shape: {depth.shape}.")
# convert inputs to numpy arrays
intrinsic_matrix = torch.from_numpy(intrinsic_matrix).to(device)
depth = torch.from_numpy(depth).to(device)
# get image height and width
im_height, im_width = depth.shape
# convert image points into list of shape (3, H x W)
img_indices = np.indices((im_width, im_height)).reshape(2, -1)
pixels = np.pad(img_indices, [(0, 1), (0, 0)], mode="constant", constant_values=1.0)
pixels = torch.tensor(pixels, dtype=torch.double, device=device)
# convert into 3D points
points = torch.matmul(torch.inverse(intrinsic_matrix), pixels)
points = points / points[-1, :]
points_xyz = points * depth.T.reshape(-1)
# convert it to (H x W , 3)
points_xyz = torch.transpose(points_xyz, dim0=0, dim1=1)
# convert 3D points to world frame
if position is not None or orientation is not None:
points_xyz = transform_pointcloud(points_xyz, position, orientation)
# convert to numpy
points_xyz = points_xyz.detach().cpu().numpy()
# remove points that have invalid depth
if not keep_invalid:
invalid_points_idx = np.where(np.logical_or(np.isnan(points_xyz), np.isinf(points_xyz)))
points_xyz = np.delete(points_xyz, invalid_points_idx, axis=0)
is_numpy = isinstance(depth, np.ndarray)
# decide device
if device is None and is_numpy:
device = torch.device("cpu")
# convert depth to torch tensor
depth = convert_to_torch(depth, dtype=torch.float32, device=device)
# update the device with the device of the depth image
# note: this is needed since warp does not provide the device directly
device = depth.device
# convert inputs to torch tensors
intrinsic_matrix = convert_to_torch(intrinsic_matrix, dtype=torch.float32, device=device)
if position is not None:
position = convert_to_torch(position, dtype=torch.float32, device=device)
if orientation is not None:
orientation = convert_to_torch(orientation, dtype=torch.float32, device=device)
# compute pointcloud
depth_cloud = _create_pointcloud_from_depth_jit(intrinsic_matrix, depth, keep_invalid, position, orientation)
return points_xyz
# return everything according to input type
if is_numpy:
return depth_cloud.detach().cpu().numpy()
else:
return depth_cloud
def create_pointcloud_from_rgbd(
intrinsic_matrix: np.ndarray,
depth: np.ndarray,
rgb: Union[np.ndarray, Tuple[float, float, float]] = None,
intrinsic_matrix: Union[torch.Tensor, np.ndarray, wp.array],
depth: Union[torch.Tensor, np.ndarray, wp.array],
rgb: Union[torch.Tensor, wp.array, np.ndarray, Tuple[float, float, float]] = None,
normalize_rgb: bool = False,
position: Sequence[float] = None,
orientation: Sequence[float] = None,
position: Optional[Sequence[float]] = None,
orientation: Optional[Sequence[float]] = None,
device: Optional[Union[torch.device, str]] = None,
num_channels: int = 3,
) -> Tuple[np.ndarray, np.ndarray]:
) -> Union[Tuple[torch.Tensor, torch.Tensor], Tuple[np.ndarray, np.ndarray]]:
"""Creates pointcloud from input depth image and camera transformation matrix.
The `rgb` attribute is used to resolve the corresponding point's color:
This function provides the same functionality as :meth:`create_pointcloud_from_depth` but also allows
to provide the RGB values for each point.
- If a numpy array of shape (H, W, 3) then the corresponding channels encode RGB values.
- If a tuple then the point cloud has a single color specified by the values (r, g, b).
- If None, then default color is white, i.e. (0, 0, 0).
The ``rgb`` attribute is used to resolve the corresponding point's color:
If the inputs `camera_position` and `camera_orientation` are provided, the pointcloud is transformed
from the camera frame to the target frame.
- If a ``np.array``/``wp.arrray``/``torch.tensor`` of shape (H, W, 3), then the corresponding channels encode RGB values.
- If a tuple, then the point cloud has a single color specified by the values (r, g, b).
- If :obj:`None`, then default color is white, i.e. (0, 0, 0).
If the input ``normalize_rgb`` is set to :obj:`True`, then the RGB values are normalized to be in the range [0, 1].
Args:
intrinsic_matrix (np.ndarray): A (3, 3) numpy array providing camera's calibration matrix.
depth (np.ndarray): An array of shape (H, W) with values encoding the depth measurement.
intrinsic_matrix (Union[torch.Tensor, np.ndarray, wp.array]): A (3, 3) array/tensor providing camera's
calibration matrix.
depth (Union[torch.Tensor, np.ndarray, wp.array]): An array/tensor of shape (H, W) with values encoding
the depth measurement.
rgb (Union[np.ndarray, Tuple[float, float, float]], optional): Color for generated point cloud.
Defaults to None.
normalize_rgb (bool, optional): Whether to normalize input rgb. Defaults to False.
position (Sequence[float], optional): The position of the camera in a target frame. Defaults to None.
orientation (Sequence[float], optional): The orientation `(w, x, y, z)` of the camera in a target frame.
position (Optional[Sequence[float]], optional): The position of the camera in a target frame.
Defaults to None.
orientation (Optional[Sequence[float]], optional): The orientation `(w, x, y, z)` of the
camera in a target frame. Defaults to None.
device (Optional[Union[torch.device, str]], optional): The device for torch where the computation
should be executed. Defaults to None, i.e. takes the device that matches the depth image.
num_channels (int, optional): Number of channels in RGB pointcloud. Defaults to 3.
Raises:
ValueError: When rgb image is a numpy array but not of shape (H, W, 3) or (H, W, 4).
Returns:
Tuple[np.ndarray, np.ndarray]: A tuple of (N, 3) numpy arrays containing 3D coordinates of
points and their RGB color respectively.
Union[Tuple[torch.Tensor, torch.Tensor], Tuple[np.ndarray, np.ndarray]]:
A tuple of (N, 3) arrays or tensors containing the 3D coordinates of points and their RGB color respectively.
The returned datatype is torch if input depth is of type torch.tensor or wp.array. Otherwise, a np.ndarray
is returned.
"""
# check valid inputs
if rgb is not None and not isinstance(rgb, tuple):
......@@ -178,10 +213,17 @@ def create_pointcloud_from_rgbd(
raise ValueError(f"Input rgb image not three-dimensional. Received shape: {rgb.shape}.")
if num_channels not in [3, 4]:
raise ValueError(f"Invalid number of channels: {num_channels} != 3 or 4.")
# check if input depth is numpy array
is_numpy = isinstance(depth, np.ndarray)
# decide device
if device is None and is_numpy:
device = torch.device("cpu")
# convert depth to torch tensor
if is_numpy:
depth = torch.from_numpy(depth).to(device=device)
# retrieve XYZ pointcloud
points_xyz = create_pointcloud_from_depth(
intrinsic_matrix, depth, position=position, orientation=orientation, keep_invalid=True
)
points_xyz = create_pointcloud_from_depth(intrinsic_matrix, depth, True, position, orientation, device=device)
# get image height and width
im_height, im_width = depth.shape[:2]
......@@ -189,29 +231,129 @@ def create_pointcloud_from_rgbd(
num_points = im_height * im_width
# extract color value
if rgb is not None:
if isinstance(rgb, np.ndarray):
if isinstance(rgb, (np.ndarray, torch.Tensor, wp.array)):
# copy numpy array to preserve
rgb = np.asarray(rgb, dtype="float").copy()
rgb = convert_to_torch(rgb, device=device, dtype=torch.float32)
rgb = rgb[:, :, :3]
# convert the matrix to (W, H, 3) since depth processing
# is done in the order (u, v) where u: 0 - W-1 and v: 0 - H-1
points_rgb = np.reshape(rgb.transpose(1, 0, 2), (-1, 3))
# convert the matrix to (W, H, 3) from (H, W, 3) since depth processing
# is done in the order (u, v) where u: (0, W-1) and v: (0 - H-1)
points_rgb = rgb.permute(1, 0, 2).reshape(-1, 3)
elif isinstance(rgb, Tuple):
points_rgb = np.asarray((rgb,) * num_points, dtype=np.uint8)
# same color for all points
points_rgb = torch.Tensor((rgb,) * num_points, device=device, dtype=torch.uint8)
else:
points_rgb = np.asarray(((0, 0, 0),) * num_points, dtype=np.uint8)
# default color is white
points_rgb = torch.Tensor(((0, 0, 0),) * num_points, device=device, dtype=torch.uint8)
else:
points_rgb = np.asarray(((0, 0, 0),) * num_points, dtype=np.uint8)
points_rgb = torch.Tensor(((0, 0, 0),) * num_points, device=device, dtype=torch.uint8)
# normalize color values
if normalize_rgb:
points_rgb = np.asarray(points_rgb, dtype="float") / 255
points_rgb = points_rgb.float() / 255
# remove invalid points
invalid_points_idx = np.where(np.logical_or(np.isnan(points_xyz), np.isinf(points_xyz)))
points_xyz = np.delete(points_xyz, invalid_points_idx, axis=0)
points_rgb = np.delete(points_rgb, invalid_points_idx, axis=0)
pts_idx_to_keep = torch.all(torch.logical_and(~torch.isnan(points_xyz), ~torch.isinf(points_xyz)), dim=1)
points_rgb = points_rgb[pts_idx_to_keep, ...]
points_xyz = points_xyz[pts_idx_to_keep, ...]
# add additional channels if required
if num_channels == 4:
points_rgb = np.pad(points_rgb, [(0, 0), (0, 1)], mode="constant", constant_values=1.0)
points_rgb = torch.nn.functional.pad(points_rgb, (0, 1), mode="constant", value=1.0)
return points_xyz, points_rgb
# return everything according to input type
if is_numpy:
return points_xyz.cpu().numpy(), points_rgb.cpu().numpy()
else:
return points_xyz, points_rgb
"""
Helper functions -- Internal
"""
@torch.jit.script
def _transform_points_jit(
points: torch.Tensor,
position: Optional[torch.Tensor] = None,
orientation: Optional[torch.Tensor] = None,
) -> torch.Tensor:
"""Transform input points in a given frame to a target frame.
Args:
points (torch.Tensor): An array of shape (N, 3) comprising of 3D points in source frame.
position (Optional[torch.Tensor], optional): The position of source frame in target frame. Defaults to None.
orientation (Optional[torch.Tensor], optional): The orientation ``(w, x, y, z)`` of source frame in target frame.
Defaults to None.
Returns:
torch.Tensor: A tensor of shape (N, 3) comprising of 3D points in target frame.
"""
# -- apply rotation
if orientation is not None:
points = torch.matmul(matrix_from_quat(orientation), points.T).T
# -- apply translation
if position is not None:
points += position
return points
@torch.jit.script
def _create_pointcloud_from_depth_jit(
intrinsic_matrix: torch.Tensor,
depth: torch.Tensor,
keep_invalid: bool = False,
position: Optional[torch.Tensor] = None,
orientation: Optional[torch.Tensor] = None,
) -> torch.Tensor:
"""Creates pointcloud from input depth image and camera intrinsic matrix.
Args:
intrinsic_matrix (torch.Tensor): A (3, 3) python tensor providing camera's calibration matrix.
depth (torch.tensor): An tensor of shape (H, W) with values encoding the depth measurement.
keep_invalid (bool, optional): Whether to keep invalid points in the cloud or not. Invalid points
correspond to pixels with depth values 0.0 or NaN. Defaults to False.
position (torch.Tensor, optional): The position of the camera in a target frame. Defaults to None.
orientation (torch.Tensor, optional): The orientation ``(w, x, y, z)`` of the camera in a target frame.
Defaults to None.
Raises:
ValueError: When intrinsic matrix is not of shape (3, 3).
ValueError: When depth image is not of shape (H, W) or (H, W, 1).
Returns:
torch.Tensor: A tensor of shape (N, 3) comprising of 3D coordinates of points.
"""
# squeeze out excess dimension
if len(depth.shape) == 3:
depth = depth.squeeze(dim=2)
# check shape of inputs
if intrinsic_matrix.shape != (3, 3):
raise ValueError(f"Input intrinsic matrix of invalid shape: {intrinsic_matrix.shape} != (3, 3).")
if len(depth.shape) != 2:
raise ValueError(f"Input depth image not two-dimensional. Received shape: {depth.shape}.")
# get image height and width
im_height, im_width = depth.shape
# convert image points into list of shape (3, H x W)
indices_u = torch.arange(im_width, device=depth.device, dtype=depth.dtype)
indices_v = torch.arange(im_height, device=depth.device, dtype=depth.dtype)
img_indices = torch.stack(torch.meshgrid([indices_u, indices_v], indexing="ij"), dim=0).reshape(2, -1)
pixels = torch.nn.functional.pad(img_indices, (0, 0, 0, 1), mode="constant", value=1.0)
# convert into 3D points
points = torch.matmul(torch.inverse(intrinsic_matrix), pixels)
points = points / points[-1, :]
points_xyz = points * depth.T.reshape(-1)
# convert it to (H x W , 3)
points_xyz = torch.transpose(points_xyz, dim0=0, dim1=1)
# convert 3D points to world frame
points_xyz = _transform_points_jit(points_xyz, position, orientation)
# remove points that have invalid depth
if not keep_invalid:
pts_idx_to_keep = torch.all(torch.logical_and(~torch.isnan(points_xyz), ~torch.isinf(points_xyz)), dim=1)
points_xyz = points_xyz[pts_idx_to_keep, ...]
return points_xyz
......@@ -12,6 +12,8 @@ from matplotlib.axes import Axes
from matplotlib.image import AxesImage
from typing import Tuple
__all__ = ["create_points_from_grid", "plot_height_grid"]
def create_points_from_grid(size: Tuple[float, float], resolution: float) -> np.ndarray:
"""Creates a list of points from 2D meshgrid.
......
......@@ -14,18 +14,24 @@ Sub-module containing utilities for the Orbit framework.
* `timer`: Provides a timer class (uses contextlib) for benchmarking.
"""
from .array import TENSOR_TYPE_CONVERSIONS, TENSOR_TYPES, convert_to_torch
from .configclass import configclass
from .dict import class_to_dict, print_dict, update_class_from_dict, update_dict
from .dict import class_to_dict, convert_dict_to_backend, print_dict, update_class_from_dict, update_dict
from .string import to_camel_case, to_snake_case
from .timer import Timer
__all__ = [
# arrays
"TENSOR_TYPES",
"TENSOR_TYPE_CONVERSIONS",
"convert_to_torch",
# config wrapper
"configclass",
# dictionary utilities
"class_to_dict",
"update_class_from_dict",
"convert_dict_to_backend",
"print_dict",
"update_class_from_dict",
"update_dict",
# string utilities
"to_camel_case",
......
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES, ETH Zurich, and University of Toronto
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Utilities for working with different array backends."""
import numpy as np
import torch
from typing import Optional, Sequence, Union
import warp as wp
__all__ = ["TENSOR_TYPES", "TENSOR_TYPE_CONVERSIONS", "convert_to_torch"]
TENSOR_TYPES = {
"numpy": np.ndarray,
"torch": torch.Tensor,
"warp": wp.array,
}
"""A dictionary containing the types for each backend.
The keys are the name of the backend ("numpy", "torch", "warp") and the values are the corresponding type
(``np.ndarray``, ``torch.Tensor``, ``wp.array``).
"""
TENSOR_TYPE_CONVERSIONS = {
"numpy": {wp.array: lambda x: x.numpy(), torch.Tensor: lambda x: x.detach().cpu().numpy()},
"torch": {wp.array: lambda x: wp.torch.to_torch(x), np.ndarray: lambda x: torch.from_numpy(x)},
"warp": {np.array: lambda x: wp.array(x), torch.Tensor: lambda x: wp.torch.from_torch(x)},
}
"""A nested dictionary containing the conversion functions for each backend.
The keys of the outer dictionary are the name of target backend ("numpy", "torch", "warp"). The keys of the
inner dictionary are the source backend (``np.ndarray``, ``torch.Tensor``, ``wp.array``).
"""
def convert_to_torch(
array: Sequence[float],
dtype: torch.dtype = None,
device: Optional[Union[torch.device, str]] = None,
) -> torch.Tensor:
"""Converts a given array into a torch tensor.
The function tries to convert the array to a torch tensor. If the array is a numpy/warp arrays, or python
list/tuples, it is converted to a torch tensor. If the array is already a torch tensor, it is returned
directly.
If ``device`` is :obj:`None`, then the function deduces the current device of the data. For numpy arrays,
this defaults to "cpu", for torch tensors it is "cpu" or "cuda", and for warp arrays it is "cuda".
Args:
array (Sequence[float]): The input array. It can be a numpy array, warp array, python list/tuple, or torch tensor.
dtype (torch.dtype, optional): Target data-type for the tensor.
device (Optional[Union[torch.device, str]], optional): The target device for the tensor. Defaults to None.
Returns:
torch.Tensor: The converted array as torch tensor.
"""
# Convert array to tensor
if isinstance(array, torch.Tensor):
tensor = array
elif isinstance(array, np.ndarray):
tensor = torch.from_numpy(array)
elif isinstance(array, wp.array):
tensor = wp.to_torch(array)
else:
tensor = torch.Tensor(array)
# Convert tensor to the right device
if device is not None and str(tensor.device) != str(device):
tensor = tensor.to(device)
# Convert dtype of tensor if requested
if dtype is not None and tensor.dtype != dtype:
tensor = tensor.type(dtype)
return tensor
......@@ -10,7 +10,9 @@ import collections.abc
import importlib
from typing import Any, Callable, Dict, Iterable, Mapping
__all__ = ["class_to_dict", "update_class_from_dict", "update_dict", "print_dict"]
from .array import TENSOR_TYPE_CONVERSIONS, TENSOR_TYPES
__all__ = ["class_to_dict", "update_class_from_dict", "convert_dict_to_backend", "update_dict", "print_dict"]
"""
Dictionary <-> Class operations.
......@@ -74,7 +76,9 @@ def update_class_from_dict(obj, data: Dict[str, Any], _ns: str = "") -> None:
KeyError: When dictionary has a key that does not exist in the default config type.
"""
for key, value in data.items():
# key_ns is the full namespace of the key
key_ns = _ns + "/" + key
# check if key is present in the object
if hasattr(obj, key):
obj_mem = getattr(obj, key)
if isinstance(obj_mem, Mapping):
......@@ -115,6 +119,75 @@ Dictionary operations.
"""
def convert_dict_to_backend(
data: dict, backend: str = "numpy", array_types: Iterable[str] = ("numpy", "torch", "warp")
) -> dict:
"""Convert all arrays or tensors in a dictionary to a given backend.
This function iterates over the dictionary, converts all arrays or tensors with the given types to
the desired backend, and stores them in a new dictionary. It also works with nested dictionaries.
Currently supported backends are "numpy", "torch", and "warp".
Note:
This function only converts arrays or tensors. Other types of data are left unchanged. Mutable types
(e.g. lists) are referenced by the new dictionary, so they are not copied.
Args:
data (dict): An input dict containing array or tensor data as values.
backend(str): The backend ("numpy", "torch", "warp") to which arrays in this dict should be converted.
Defaults to "numpy".
array_types(Iterable[str]): A list containing the types of arrays that should be converted to
the desired backend. Defaults to ("numpy", "torch", "warp").
Raises:
ValueError: If the specified ``backend`` or ``array_types`` are unknown, i.e. not in the list of supported
backends ("numpy", "torch", "warp").
Returns:
dict: The updated dict with the data converted to the desired backend.
"""
# THINK: Should we also support converting to a specific device, e.g. "cuda:0"?
# Define the conversion functions for each backend.
if backend not in TENSOR_TYPE_CONVERSIONS:
raise ValueError(f"Unknown backend '{backend}'. Supported backends are 'numpy', 'torch', and 'warp'.")
else:
tensor_type_conversions = TENSOR_TYPE_CONVERSIONS[backend]
# Parse the array types and convert them to the corresponding types: "numpy" -> np.ndarray, etc.
parsed_types = list()
for t in array_types:
# Check type is valid.
if t not in TENSOR_TYPES:
raise ValueError(f"Unknown array type: '{t}'. Supported array types are 'numpy', 'torch', and 'warp'.")
# Exclude types that match the backend, since we do not need to convert these.
if t == backend:
continue
# Convert the string types to the corresponding types.
parsed_types.append(TENSOR_TYPES[t])
# Convert the data to the desired backend.
output_dict = dict()
for key, value in data.items():
# Obtain the data type of the current value.
data_type = type(value)
# -- arrays
if data_type in parsed_types:
# check if we have a known conversion.
if data_type not in tensor_type_conversions:
raise ValueError(f"No registered conversion for data type: {data_type} to {backend}!")
else:
output_dict[key] = tensor_type_conversions[data_type](value)
# -- nested dictionaries
elif isinstance(data[key], dict):
output_dict[key] = convert_dict_to_backend(value)
# -- everything else
else:
output_dict[key] = value
return output_dict
def update_dict(orig_dict: dict, new_dict: collections.abc.Mapping) -> dict:
"""Updates existing dictionary with values from a new dictionary.
......
......@@ -2,7 +2,6 @@
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
This script shows how to use the camera sensor from the Orbit framework.
......@@ -12,14 +11,15 @@ the simulator or OpenGL convention for the camera, we use the robotics or ROS co
"""Launch Isaac Sim Simulator first."""
import argparse
# omni-isaac-orbit
from omni.isaac.kit import SimulationApp
# add argparse arguments
parser = argparse.ArgumentParser("Welcome to Orbit: Omniverse Robotics Environments!")
parser.add_argument("--headless", action="store_true", default=False, help="Force display off at all times.")
parser.add_argument("--gpu", action="store_true", default=False, help="Use gpu for pointcloud unprojection.")
args_cli = parser.parse_args()
# launch omniverse app
......@@ -45,6 +45,7 @@ from pxr import Gf, UsdGeom
import omni.isaac.orbit.utils.kit as kit_utils
from omni.isaac.orbit.sensors.camera import Camera, PinholeCameraCfg
from omni.isaac.orbit.sensors.camera.utils import create_pointcloud_from_rgbd
from omni.isaac.orbit.utils import convert_dict_to_backend
"""
Helpers
......@@ -69,6 +70,8 @@ def design_scene():
translation=(-4.5, 3.5, 10.0),
attributes={"radius": 2.5, "intensity": 600.0, "color": (1.0, 1.0, 1.0)},
)
# Xform to hold objects
prim_utils.create_prim("/World/Objects", "Xform")
# Random objects
for i in range(8):
# sample random position
......@@ -101,6 +104,7 @@ Main
def main():
"""Runs a camera sensor from orbit."""
device = "cuda" if args_cli.gpu else "cpu"
# Load kit helper
sim = SimulationContext(stage_units_in_meters=1.0, physics_dt=0.005, rendering_dt=0.005, backend="torch")
# Set main camera
......@@ -118,8 +122,12 @@ def main():
data_types=["rgb", "distance_to_image_plane", "normals", "motion_vectors"],
usd_params=PinholeCameraCfg.UsdCameraCfg(clipping_range=(0.1, 1.0e5)),
)
camera = Camera(cfg=camera_cfg, device="cpu")
camera = Camera(cfg=camera_cfg, device=device)
# Spawn camera
camera.spawn("/World/CameraSensor")
# Initialize camera
# note: For rendering based sensors, it is not necessary to initialize before playing the simulation.
camera.initialize()
# Create replicator writer
......@@ -144,13 +152,16 @@ def main():
sim.step()
# Update camera data
camera.update(dt=0.0)
# Print camera info
print(camera)
print("Received shape of rgb image: ", camera.data.output["rgb"].shape)
print("Received shape of depth image: ", camera.data.output["distance_to_image_plane"].shape)
print("-------------------------------")
# Save images
rep_writer.write(camera.data.output)
# note: BasicWriter only supports saving data in numpy format
rep_writer.write(convert_dict_to_backend(camera.data.output, backend="numpy"))
# Pointcloud in world frame
pointcloud_w, pointcloud_rgb = create_pointcloud_from_rgbd(
......@@ -162,7 +173,13 @@ def main():
normalize_rgb=True,
num_channels=4,
)
# visualize the points
# Convert to numpy for visualization
if not isinstance(pointcloud_w, np.ndarray):
pointcloud_w = pointcloud_w.cpu().numpy()
if not isinstance(pointcloud_rgb, np.ndarray):
pointcloud_rgb = pointcloud_rgb.cpu().numpy()
# Visualize the points
num_points = pointcloud_w.shape[0]
points_size = [1.25] * num_points
points_color = pointcloud_rgb
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment