Commit a46f9348 authored by Toni-SM's avatar Toni-SM Committed by David Hoeller

Adds the multi-agent RL environment (#93)

This PR adds the interface and configuration for creating multi-agent
tasks using the direct workflow.

<!-- As you go through the list, delete the ones that are not
applicable. -->

- New feature (non-breaking change which adds functionality)
- This change requires a documentation update

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./isaaclab.sh --format`
- [x] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [x] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there

<!--
As you go through the checklist above, you can mark something as done by
putting an x character in it

For example,
- [x] I have done this task
- [ ] I have not done this task
-->
parent e97eb784
......@@ -20,6 +20,8 @@
ManagerBasedRLEnvCfg
DirectRLEnv
DirectRLEnvCfg
DirectMARLEnv
DirectMARLEnvCfg
ViewerCfg
Manager Based Environment
......@@ -60,6 +62,20 @@ Direct RL Environment
:show-inheritance:
:exclude-members: __init__, class_type
Direct Multi-Agent RL Environment
---------------------------------
.. autoclass:: DirectMARLEnv
:members:
:inherited-members:
:show-inheritance:
.. autoclass:: DirectMARLEnvCfg
:members:
:inherited-members:
:show-inheritance:
:exclude-members: __init__, class_type
Common
------
......
......@@ -91,12 +91,13 @@ Direct Environments
The direct-style environment aligns more closely with traditional implementations of environments,
where a single script directly implements the reward function, observation function, resets, and all the other components
of the environment. This approach does not require the manager classes. Instead, users are provided the complete freedom
to implement their task through the APIs from the base class :class:`envs.DirectRLEnv`. For users migrating from the `IsaacGymEnvs`_
and `OmniIsaacGymEnvs`_ framework, this workflow may be more familiar.
to implement their task through the APIs from the base classes :class:`envs.DirectRLEnv` or :class:`envs.DirectMARLEnv`.
For users migrating from the `IsaacGymEnvs`_ and `OmniIsaacGymEnvs`_ framework, this workflow may be more familiar.
When defining an environment with the direct-style implementation, we expect the user define a single class that
implements the entire environment. The task class should inherit from the base :class:`envs.DirectRLEnv` class and should
have its corresponding configuration class that inherits from :class:`envs.DirectRLEnvCfg`. The task class is responsible
implements the entire environment. The task class should inherit from the base classes :class:`envs.DirectRLEnv` or
:class:`envs.DirectMARLEnv` and should have its corresponding configuration class that inherits from
:class:`envs.DirectRLEnvCfg` or :class:`envs.DirectMARLEnvCfg` respectively. The task class is responsible
for setting up the scene, processing the actions, computing the rewards, observations, resets, and termination signals.
.. dropdown:: Example for defining the reward function for the Cartpole task using the direct-style
......
......@@ -53,7 +53,8 @@ are running simultaneously in the same process, and all the data is returned in
fashion.
Similarly, the :class:`envs.DirectRLEnv` class also inherits from the :class:`gymnasium.Env` class
for the direct workflow.
for the direct workflow. For :class:`envs.DirectMARLEnv`, although it does not inherit
from Gymnasium, it can be registered and created in the same way.
Using the gym registry
----------------------
......
[package]
# Note: Semantic Versioning is used: https://semver.org/
version = "0.22.12"
version = "0.23.10"
# Description
title = "Isaac Lab framework for Robot Learning"
......
Changelog
---------
0.22.12 (2024-09-08)
~~~~~~~~~~~~~~~~~~~~
Changed
^^^^^^^
* Moved the configuration of visualization markers for the command terms to their respective configuration classes.
This allows users to modify the markers for the command terms without having to modify the command term classes.
0.22.11 (2024-09-10)
0.23.10 (2024-09-10)
~~~~~~~~~~~~~~~~~~~~
Added
......@@ -20,7 +10,7 @@ Added
* Added config class, support, and tests for MJCF conversion via standalone python scripts.
0.22.10 (2024-09-09)
0.23.9 (2024-09-09)
~~~~~~~~~~~~~~~~~~~~
Added
......@@ -32,7 +22,7 @@ Added
file or the command line argument. This ensures that the simulation results are reproducible across different runs.
0.22.9 (2024-09-08)
0.23.8 (2024-09-08)
~~~~~~~~~~~~~~~~~~~
Changed
......@@ -42,7 +32,7 @@ Changed
for faster processing of high dimensional input tensors.
0.22.8 (2024-09-06)
0.23.7 (2024-09-06)
~~~~~~~~~~~~~~~~~~~
Added
......@@ -53,7 +43,7 @@ Added
instance variables instead.
0.22.7 (2024-09-05)
0.23.6 (2024-09-05)
~~~~~~~~~~~~~~~~~~~
Fixed
......@@ -63,7 +53,7 @@ Fixed
more-intuitive to control the y-axis motion based on the right-hand rule.
0.22.6 (2024-08-29)
0.23.5 (2024-08-29)
~~~~~~~~~~~~~~~~~~~
Added
......@@ -73,7 +63,7 @@ Added
consistent with all other cameras (equal to type "depth").
0.22.5 (2024-08-29)
0.23.4 (2024-09-02)
~~~~~~~~~~~~~~~~~~~
Fixed
......@@ -84,7 +74,7 @@ Fixed
* Added test to check :attr:`omni.isaac.lab.sensors.RayCasterCamera.set_intrinsic_matrices`
0.22.4 (2024-08-29)
0.23.3 (2024-08-29)
~~~~~~~~~~~~~~~~~~~
Fixed
......@@ -95,7 +85,7 @@ Fixed
which required initialization of the class to call the class-methods.
0.22.3 (2024-08-28)
0.23.2 (2024-08-28)
~~~~~~~~~~~~~~~~~~~
Added
......@@ -116,7 +106,7 @@ Fixed
the behavior equal to the USD Camera.
0.22.2 (2024-08-21)
0.23.1 (2024-08-21)
~~~~~~~~~~~~~~~~~~~
Changed
......@@ -125,6 +115,15 @@ Changed
* Disabled default viewport in certain headless scenarios for better performance.
0.23.0 (2024-08-17)
~~~~~~~~~~~~~~~~~~~
Added
^^^^^
* Added direct workflow base class :class:`omni.isaac.lab.envs.DirectMARLEnv` for multi-agent environments.
0.22.1 (2024-08-17)
~~~~~~~~~~~~~~~~~~~
......@@ -140,7 +139,7 @@ Added
~~~~~~~~~~~~~~~~~~~
Added
^^^^^^^
^^^^^
* Added :mod:`~omni.isaac.lab.utils.modifiers` module to provide framework for configurable and custom
observation data modifiers.
......
......@@ -20,7 +20,9 @@ There are two types of environment designing workflows:
* **Direct**: The user implements all the necessary functionality directly into a single class
directly without the need for additional managers.
Based on these workflows, there are the following environment classes:
Based on these workflows, there are the following environment classes for single and multi-agent RL:
**Single-Agent RL:**
* :class:`ManagerBasedEnv`: The manager-based workflow base environment which only provides the
agent with the current observations and executes the actions provided by the agent.
......@@ -30,6 +32,11 @@ Based on these workflows, there are the following environment classes:
* :class:`DirectRLEnv`: The direct workflow RL task environment which provides implementations for
implementing scene setup, computing dones, performing resets, and computing reward and observation.
**Multi-Agent RL (MARL):**
* :class:`DirectMARLEnv`: The direct workflow MARL task environment which provides implementations for
implementing scene setup, computing dones, performing resets, and computing reward and observation.
For more information about the workflow design patterns, see the `Task Design Workflows`_ section.
.. _`Task Design Workflows`: https://isaac-sim.github.io/IsaacLab/source/features/task_workflows.html
......@@ -37,9 +44,12 @@ For more information about the workflow design patterns, see the `Task Design Wo
from . import mdp, ui
from .common import VecEnvObs, VecEnvStepReturn, ViewerCfg
from .direct_marl_env import DirectMARLEnv
from .direct_marl_env_cfg import DirectMARLEnvCfg
from .direct_rl_env import DirectRLEnv
from .direct_rl_env_cfg import DirectRLEnvCfg
from .manager_based_env import ManagerBasedEnv
from .manager_based_env_cfg import ManagerBasedEnvCfg
from .manager_based_rl_env import ManagerBasedRLEnv
from .manager_based_rl_env_cfg import ManagerBasedRLEnvCfg
from .utils import multi_agent_to_single_agent, multi_agent_with_one_agent
......@@ -6,7 +6,7 @@
from __future__ import annotations
import torch
from typing import Dict, Literal
from typing import Dict, Literal, TypeVar
from omni.isaac.lab.utils import configclass
......@@ -96,3 +96,40 @@ The tuple contains batched information for each sub-environment. The information
4. **Timeout Dones**: Whether the environment reached a timeout state, such as end of max episode length.
5. **Extras**: A dictionary containing additional information from the environment.
"""
AgentID = TypeVar("AgentID")
"""Unique identifier for an agent within a multi-agent environment.
The identifier has to be an immutable object, typically a string (e.g.: ``"agent_0"``).
"""
ObsType = TypeVar("ObsType", torch.Tensor, Dict[str, torch.Tensor])
"""A sentinel object to indicate the data type of the observation.
"""
ActionType = TypeVar("ActionType", torch.Tensor, Dict[str, torch.Tensor])
"""A sentinel object to indicate the data type of the action.
"""
StateType = TypeVar("StateType", torch.Tensor, dict)
"""A sentinel object to indicate the data type of the state.
"""
EnvStepReturn = tuple[
Dict[AgentID, ObsType],
Dict[AgentID, torch.Tensor],
Dict[AgentID, torch.Tensor],
Dict[AgentID, torch.Tensor],
Dict[AgentID, dict],
]
"""The environment signals processed at the end of each step.
The tuple contains batched information for each sub-environment (keyed by the agent ID).
The information is stored in the following order:
1. **Observations**: The observations from the environment.
2. **Rewards**: The rewards from the environment.
3. **Terminated Dones**: Whether the environment reached a terminal state, such as task success or robot falling etc.
4. **Timeout Dones**: Whether the environment reached a timeout state, such as end of max episode length.
5. **Extras**: A dictionary containing additional information from the environment.
"""
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import builtins
import gymnasium as gym
import inspect
import math
import numpy as np
import torch
import weakref
from abc import abstractmethod
from collections.abc import Sequence
from typing import Any, ClassVar
import carb
import omni.isaac.core.utils.torch as torch_utils
import omni.kit.app
from omni.isaac.version import get_version
from omni.isaac.lab.managers import EventManager
from omni.isaac.lab.scene import InteractiveScene
from omni.isaac.lab.sim import SimulationContext
from omni.isaac.lab.utils.noise import NoiseModel
from omni.isaac.lab.utils.timer import Timer
from .common import ActionType, AgentID, EnvStepReturn, ObsType, StateType
from .direct_marl_env_cfg import DirectMARLEnvCfg
from .ui import ViewportCameraController
class DirectMARLEnv:
"""The superclass for the direct workflow to design multi-agent environments.
This class implements the core functionality for multi-agent reinforcement learning (MARL)
environments. It is designed to be used with any RL library. The class is designed
to be used with vectorized environments, i.e., the environment is expected to be run
in parallel with multiple sub-environments.
The design of this class is based on the PettingZoo Parallel API.
While the environment itself is implemented as a vectorized environment, we do not
inherit from :class:`pettingzoo.ParallelEnv` or :class:`gym.vector.VectorEnv`. This is mainly
because the class adds various attributes and methods that are inconsistent with them.
Note:
For vectorized environments, it is recommended to **only** call the :meth:`reset`
method once before the first call to :meth:`step`, i.e. after the environment is created.
After that, the :meth:`step` function handles the reset of terminated sub-environments.
This is because the simulator does not support resetting individual sub-environments
in a vectorized environment.
"""
metadata: ClassVar[dict[str, Any]] = {
"render_modes": [None, "human", "rgb_array"],
"isaac_sim_version": get_version(),
}
"""Metadata for the environment."""
def __init__(self, cfg: DirectMARLEnvCfg, render_mode: str | None = None, **kwargs):
"""Initialize the environment.
Args:
cfg: The configuration object for the environment.
render_mode: The render mode for the environment. Defaults to None, which
is similar to ``"human"``.
Raises:
RuntimeError: If a simulation context already exists. The environment must always create one
since it configures the simulation context and controls the simulation.
"""
# store inputs to class
self.cfg = cfg
# store the render mode
self.render_mode = render_mode
# initialize internal variables
self._is_closed = False
# create a simulation context to control the simulator
if SimulationContext.instance() is None:
self.sim: SimulationContext = SimulationContext(self.cfg.sim)
else:
raise RuntimeError("Simulation context already exists. Cannot create a new one.")
# print useful information
print("[INFO]: Base environment:")
print(f"\tEnvironment device : {self.device}")
print(f"\tPhysics step-size : {self.physics_dt}")
print(f"\tRendering step-size : {self.physics_dt * self.cfg.sim.render_interval}")
print(f"\tEnvironment step-size : {self.step_dt}")
if self.cfg.sim.render_interval < self.cfg.decimation:
msg = (
f"The render interval ({self.cfg.sim.render_interval}) is smaller than the decimation "
f"({self.cfg.decimation}). Multiple multiple render calls will happen for each environment step."
"If this is not intended, set the render interval to be equal to the decimation."
)
carb.log_warn(msg)
# generate scene
with Timer("[INFO]: Time taken for scene creation", "scene_creation"):
self.scene = InteractiveScene(self.cfg.scene)
self._setup_scene()
print("[INFO]: Scene manager: ", self.scene)
# set up camera viewport controller
# viewport is not available in other rendering modes so the function will throw a warning
# FIXME: This needs to be fixed in the future when we unify the UI functionalities even for
# non-rendering modes.
if self.sim.render_mode >= self.sim.RenderMode.PARTIAL_RENDERING:
self.viewport_camera_controller = ViewportCameraController(self, self.cfg.viewer)
else:
self.viewport_camera_controller = None
# play the simulator to activate physics handles
# note: this activates the physics simulation view that exposes TensorAPIs
# note: when started in extension mode, first call sim.reset_async() and then initialize the managers
if builtins.ISAAC_LAUNCHED_FROM_TERMINAL is False:
print("[INFO]: Starting the simulation. This may take a few seconds. Please wait...")
with Timer("[INFO]: Time taken for simulation start", "simulation_start"):
self.sim.reset()
# -- event manager used for randomization
if self.cfg.events:
self.event_manager = EventManager(self.cfg.events, self)
print("[INFO] Event Manager: ", self.event_manager)
# make sure torch is running on the correct device
if "cuda" in self.device:
torch.cuda.set_device(self.device)
# check if debug visualization is has been implemented by the environment
source_code = inspect.getsource(self._set_debug_vis_impl)
self.has_debug_vis_implementation = "NotImplementedError" not in source_code
self._debug_vis_handle = None
# extend UI elements
# we need to do this here after all the managers are initialized
# this is because they dictate the sensors and commands right now
if self.sim.has_gui() and self.cfg.ui_window_class_type is not None:
self._window = self.cfg.ui_window_class_type(self, window_name="IsaacLab")
else:
# if no window, then we don't need to store the window
self._window = None
# allocate dictionary to store metrics
self.extras = {agent: {} for agent in self.cfg.possible_agents}
# initialize data and constants
# -- counter for simulation steps
self._sim_step_counter = 0
# -- counter for curriculum
self.common_step_counter = 0
# -- init buffers
self.episode_length_buf = torch.zeros(self.num_envs, device=self.device, dtype=torch.long)
self.reset_buf = torch.zeros(self.num_envs, dtype=torch.bool, device=self.sim.device)
self.actions = {
agent: torch.zeros(self.num_envs, self.cfg.num_actions[agent], device=self.sim.device)
for agent in self.cfg.possible_agents
}
# setup the observation, state and action spaces
self._configure_env_spaces()
# setup noise cfg for adding action and observation noise
if self.cfg.action_noise_model:
self._action_noise_model: dict[AgentID, NoiseModel] = {
agent: noise_model.class_type(self.num_envs, noise_model, self.device)
for agent, noise_model in self.cfg.action_noise_model.items()
if noise_model is not None
}
if self.cfg.observation_noise_model:
self._observation_noise_model: dict[AgentID, NoiseModel] = {
agent: noise_model.class_type(self.num_envs, noise_model, self.device)
for agent, noise_model in self.cfg.observation_noise_model.items()
if noise_model is not None
}
# perform events at the start of the simulation
if self.cfg.events:
if "startup" in self.event_manager.available_modes:
self.event_manager.apply(mode="startup")
# print the environment information
print("[INFO]: Completed setting up the environment...")
def __del__(self):
"""Cleanup for the environment."""
self.close()
"""
Properties.
"""
@property
def num_envs(self) -> int:
"""The number of instances of the environment that are running."""
return self.scene.num_envs
@property
def num_agents(self) -> int:
"""Number of current agents.
The number of current agents may change as the environment progresses (e.g.: agents can be added or removed).
"""
return len(self.agents)
@property
def max_num_agents(self) -> int:
"""Number of all possible agents the environment can generate.
This value remains constant as the environment progresses.
"""
return len(self.possible_agents)
@property
def unwrapped(self) -> DirectMARLEnv:
"""Get the unwrapped environment underneath all the layers of wrappers."""
return self
@property
def physics_dt(self) -> float:
"""The physics time-step (in s).
This is the lowest time-decimation at which the simulation is happening.
"""
return self.cfg.sim.dt
@property
def step_dt(self) -> float:
"""The environment stepping time-step (in s).
This is the time-step at which the environment steps forward.
"""
return self.cfg.sim.dt * self.cfg.decimation
@property
def device(self):
"""The device on which the environment is running."""
return self.sim.device
@property
def max_episode_length_s(self) -> float:
"""Maximum episode length in seconds."""
return self.cfg.episode_length_s
@property
def max_episode_length(self):
"""The maximum episode length in steps adjusted from s."""
return math.ceil(self.max_episode_length_s / (self.cfg.sim.dt * self.cfg.decimation))
"""
Space methods
"""
def observation_space(self, agent: AgentID) -> gym.Space:
"""Get the observation space for the specified agent.
Returns:
The agent's observation space.
"""
return self.observation_spaces[agent]
def action_space(self, agent: AgentID) -> gym.Space:
"""Get the action space for the specified agent.
Returns:
The agent's action space.
"""
return self.action_spaces[agent]
"""
Operations.
"""
def reset(
self, seed: int | None = None, options: dict[str, Any] | None = None
) -> tuple[dict[AgentID, ObsType], dict[AgentID, dict]]:
"""Resets all the environments and returns observations.
Args:
seed: The seed to use for randomization. Defaults to None, in which case the seed is not set.
options: Additional information to specify how the environment is reset. Defaults to None.
Note:
This argument is used for compatibility with Gymnasium environment definition.
Returns:
A tuple containing the observations and extras (keyed by the agent ID).
"""
# set the seed
if seed is not None:
self.seed(seed)
# reset state of scene
indices = torch.arange(self.num_envs, dtype=torch.int64, device=self.device)
self._reset_idx(indices)
# update observations and the list of current agents (sorted as in possible_agents)
self.obs_dict = self._get_observations()
self.agents = [agent for agent in self.possible_agents if agent in self.obs_dict]
# return observations
return self.obs_dict, self.extras
def step(self, actions: dict[AgentID, ActionType]) -> EnvStepReturn:
"""Execute one time-step of the environment's dynamics.
The environment steps forward at a fixed time-step, while the physics simulation is decimated at a
lower time-step. This is to ensure that the simulation is stable. These two time-steps can be configured
independently using the :attr:`DirectMARLEnvCfg.decimation` (number of simulation steps per environment step)
and the :attr:`DirectMARLEnvCfg.sim.physics_dt` (physics time-step). Based on these parameters, the environment
time-step is computed as the product of the two.
This function performs the following steps:
1. Pre-process the actions before stepping through the physics.
2. Apply the actions to the simulator and step through the physics in a decimated manner.
3. Compute the reward and done signals.
4. Reset environments that have terminated or reached the maximum episode length.
5. Apply interval events if they are enabled.
6. Compute observations.
Args:
actions: The actions to apply on the environment (keyed by the agent ID).
Shape of individual tensors is (num_envs, action_dim).
Returns:
A tuple containing the observations, rewards, resets (terminated and truncated) and extras (keyed by the agent ID).
"""
actions = {agent: action.to(self.device) for agent, action in actions.items()}
# add action noise
if self.cfg.action_noise_model:
for agent, action in actions.items():
if agent in self._action_noise_model:
actions[agent] = self._action_noise_model[agent].apply(action)
# process actions
self._pre_physics_step(actions)
# check if we need to do rendering within the physics loop
# note: checked here once to avoid multiple checks within the loop
is_rendering = self.sim.has_gui() or self.sim.has_rtx_sensors()
# perform physics stepping
for _ in range(self.cfg.decimation):
self._sim_step_counter += 1
# set actions into buffers
self._apply_action()
# set actions into simulator
self.scene.write_data_to_sim()
# simulate
self.sim.step(render=False)
# render between steps only if the GUI or an RTX sensor needs it
# note: we assume the render interval to be the shortest accepted rendering interval.
# If a camera needs rendering at a faster frequency, this will lead to unexpected behavior.
if self._sim_step_counter % self.cfg.sim.render_interval == 0 and is_rendering:
self.sim.render()
# update buffers at sim dt
self.scene.update(dt=self.physics_dt)
# post-step:
# -- update env counters (used for curriculum generation)
self.episode_length_buf += 1 # step in current episode (per env)
self.common_step_counter += 1 # total step (common for all envs)
self.terminated_dict, self.time_out_dict = self._get_dones()
self.reset_buf[:] = math.prod(self.terminated_dict.values()) | math.prod(self.time_out_dict.values())
self.reward_dict = self._get_rewards()
# -- reset envs that terminated/timed-out and log the episode information
reset_env_ids = self.reset_buf.nonzero(as_tuple=False).squeeze(-1)
if len(reset_env_ids) > 0:
self._reset_idx(reset_env_ids)
# post-step: step interval event
if self.cfg.events:
if "interval" in self.event_manager.available_modes:
self.event_manager.apply(mode="interval", dt=self.step_dt)
# update observations and the list of current agents (sorted as in possible_agents)
self.obs_dict = self._get_observations()
self.agents = [agent for agent in self.possible_agents if agent in self.obs_dict]
# add observation noise
# note: we apply no noise to the state space (since it is used for centralized training or critic networks)
if self.cfg.observation_noise_model:
for agent, obs in self.obs_dict.items():
if agent in self._observation_noise_model:
self.obs_dict[agent] = self._observation_noise_model[agent].apply(obs)
# return observations, rewards, resets and extras
return self.obs_dict, self.reward_dict, self.terminated_dict, self.time_out_dict, self.extras
def state(self) -> StateType | None:
"""Returns the state for the environment.
The state-space is used for centralized training or asymmetric actor-critic architectures. It is configured
using the :attr:`DirectMARLEnvCfg.num_states` parameter.
Returns:
The states for the environment, or None if :attr:`DirectMARLEnvCfg.num_states` parameter is zero.
"""
if not self.cfg.num_states:
return None
# concatenate and return the observations as state
if self.cfg.num_states < 0:
self.state_buf = torch.cat([self.obs_dict[agent] for agent in self.cfg.possible_agents], dim=-1)
# compute and return custom environment state
else:
self.state_buf = self._get_states()
return self.state_buf
@staticmethod
def seed(seed: int = -1) -> int:
"""Set the seed for the environment.
Args:
seed: The seed for random generator. Defaults to -1.
Returns:
The seed used for random generator.
"""
# set seed for replicator
try:
import omni.replicator.core as rep
rep.set_global_seed(seed)
except ModuleNotFoundError:
pass
# set seed for torch and other libraries
return torch_utils.set_seed(seed)
def render(self, recompute: bool = False) -> np.ndarray | None:
"""Run rendering without stepping through the physics.
By convention, if mode is:
- **human**: Render to the current display and return nothing. Usually for human consumption.
- **rgb_array**: Return an numpy.ndarray with shape (x, y, 3), representing RGB values for an
x-by-y pixel image, suitable for turning into a video.
Args:
recompute: Whether to force a render even if the simulator has already rendered the scene.
Defaults to False.
Returns:
The rendered image as a numpy array if mode is "rgb_array". Otherwise, returns None.
Raises:
RuntimeError: If mode is set to "rgb_data" and simulation render mode does not support it.
In this case, the simulation render mode must be set to ``RenderMode.PARTIAL_RENDERING``
or ``RenderMode.FULL_RENDERING``.
NotImplementedError: If an unsupported rendering mode is specified.
"""
# run a rendering step of the simulator
# if we have rtx sensors, we do not need to render again sin
if not self.sim.has_rtx_sensors() and not recompute:
self.sim.render()
# decide the rendering mode
if self.render_mode == "human" or self.render_mode is None:
return None
elif self.render_mode == "rgb_array":
# check that if any render could have happened
if self.sim.render_mode.value < self.sim.RenderMode.PARTIAL_RENDERING.value:
raise RuntimeError(
f"Cannot render '{self.render_mode}' when the simulation render mode is"
f" '{self.sim.render_mode.name}'. Please set the simulation render mode to:"
f"'{self.sim.RenderMode.PARTIAL_RENDERING.name}' or '{self.sim.RenderMode.FULL_RENDERING.name}'."
" If running headless, make sure --enable_cameras is set."
)
# create the annotator if it does not exist
if not hasattr(self, "_rgb_annotator"):
import omni.replicator.core as rep
# create render product
self._render_product = rep.create.render_product(
self.cfg.viewer.cam_prim_path, self.cfg.viewer.resolution
)
# create rgb annotator -- used to read data from the render product
self._rgb_annotator = rep.AnnotatorRegistry.get_annotator("rgb", device="cpu")
self._rgb_annotator.attach([self._render_product])
# obtain the rgb data
rgb_data = self._rgb_annotator.get_data()
# convert to numpy array
rgb_data = np.frombuffer(rgb_data, dtype=np.uint8).reshape(*rgb_data.shape)
# return the rgb data
# note: initially the renderer is warming up and returns empty data
if rgb_data.size == 0:
return np.zeros((self.cfg.viewer.resolution[1], self.cfg.viewer.resolution[0], 3), dtype=np.uint8)
else:
return rgb_data[:, :, :3]
else:
raise NotImplementedError(
f"Render mode '{self.render_mode}' is not supported. Please use: {self.metadata['render_modes']}."
)
def close(self):
"""Cleanup for the environment."""
if not self._is_closed:
# close entities related to the environment
# note: this is order-sensitive to avoid any dangling references
if self.cfg.events:
del self.event_manager
del self.scene
if self.viewport_camera_controller is not None:
del self.viewport_camera_controller
# clear callbacks and instance
self.sim.clear_all_callbacks()
self.sim.clear_instance()
# destroy the window
if self._window is not None:
self._window = None
# update closing status
self._is_closed = True
"""
Operations - Debug Visualization.
"""
def set_debug_vis(self, debug_vis: bool) -> bool:
"""Toggles the environment debug visualization.
Args:
debug_vis: Whether to visualize the environment debug visualization.
Returns:
Whether the debug visualization was successfully set. False if the environment
does not support debug visualization.
"""
# check if debug visualization is supported
if not self.has_debug_vis_implementation:
return False
# toggle debug visualization objects
self._set_debug_vis_impl(debug_vis)
# toggle debug visualization handles
if debug_vis:
# create a subscriber for the post update event if it doesn't exist
if self._debug_vis_handle is None:
app_interface = omni.kit.app.get_app_interface()
self._debug_vis_handle = app_interface.get_post_update_event_stream().create_subscription_to_pop(
lambda event, obj=weakref.proxy(self): obj._debug_vis_callback(event)
)
else:
# remove the subscriber if it exists
if self._debug_vis_handle is not None:
self._debug_vis_handle.unsubscribe()
self._debug_vis_handle = None
# return success
return True
"""
Helper functions.
"""
def _configure_env_spaces(self):
"""Configure the spaces for the environment."""
self.agents = self.cfg.possible_agents
self.possible_agents = self.cfg.possible_agents
# set up observation and action spaces
self.observation_spaces = {
agent: gym.spaces.Box(low=-np.inf, high=np.inf, shape=(self.cfg.num_observations[agent],))
for agent in self.cfg.possible_agents
}
self.action_spaces = {
agent: gym.spaces.Box(low=-np.inf, high=np.inf, shape=(self.cfg.num_actions[agent],))
for agent in self.cfg.possible_agents
}
# set up state space
if not self.cfg.num_states:
self.state_space = None
if self.cfg.num_states < 0:
self.state_space = gym.spaces.Box(
low=-np.inf, high=np.inf, shape=(sum(self.cfg.num_observations.values()),)
)
else:
self.state_space = gym.spaces.Box(low=-np.inf, high=np.inf, shape=(self.cfg.num_states,))
def _reset_idx(self, env_ids: Sequence[int]):
"""Reset environments based on specified indices.
Args:
env_ids: List of environment ids which must be reset
"""
self.scene.reset(env_ids)
# apply events such as randomization for environments that need a reset
if self.cfg.events:
if "reset" in self.event_manager.available_modes:
env_step_count = self._sim_step_counter // self.cfg.decimation
self.event_manager.apply(mode="reset", env_ids=env_ids, global_env_step_count=env_step_count)
# reset noise models
if self.cfg.action_noise_model:
for noise_model in self._action_noise_model.values():
noise_model.reset(env_ids)
if self.cfg.observation_noise_model:
for noise_model in self._observation_noise_model.values():
noise_model.reset(env_ids)
# reset the episode length buffer
self.episode_length_buf[env_ids] = 0
"""
Implementation-specific functions.
"""
def _setup_scene(self):
"""Setup the scene for the environment.
This function is responsible for creating the scene objects and setting up the scene for the environment.
The scene creation can happen through :class:`omni.isaac.lab.scene.InteractiveSceneCfg` or through
directly creating the scene objects and registering them with the scene manager.
We leave the implementation of this function to the derived classes. If the environment does not require
any explicit scene setup, the function can be left empty.
"""
pass
@abstractmethod
def _pre_physics_step(self, actions: dict[AgentID, ActionType]):
"""Pre-process actions before stepping through the physics.
This function is responsible for pre-processing the actions before stepping through the physics.
It is called before the physics stepping (which is decimated).
Args:
actions: The actions to apply on the environment (keyed by the agent ID).
Shape of individual tensors is (num_envs, action_dim).
"""
raise NotImplementedError(f"Please implement the '_pre_physics_step' method for {self.__class__.__name__}.")
@abstractmethod
def _apply_action(self):
"""Apply actions to the simulator.
This function is responsible for applying the actions to the simulator. It is called at each
physics time-step.
"""
raise NotImplementedError(f"Please implement the '_apply_action' method for {self.__class__.__name__}.")
@abstractmethod
def _get_observations(self) -> dict[AgentID, ObsType]:
"""Compute and return the observations for the environment.
Returns:
The observations for the environment (keyed by the agent ID).
"""
raise NotImplementedError(f"Please implement the '_get_observations' method for {self.__class__.__name__}.")
@abstractmethod
def _get_states(self) -> StateType:
"""Compute and return the states for the environment.
This method is only called (and therefore has to be implemented) when the :attr:`DirectMARLEnvCfg.num_states`
parameter is greater than zero.
Returns:
The states for the environment.
"""
raise NotImplementedError(f"Please implement the '_get_states' method for {self.__class__.__name__}.")
@abstractmethod
def _get_rewards(self) -> dict[AgentID, torch.Tensor]:
"""Compute and return the rewards for the environment.
Returns:
The rewards for the environment (keyed by the agent ID).
Shape of individual tensors is (num_envs,).
"""
raise NotImplementedError(f"Please implement the '_get_rewards' method for {self.__class__.__name__}.")
@abstractmethod
def _get_dones(self) -> tuple[dict[AgentID, torch.Tensor], dict[AgentID, torch.Tensor]]:
"""Compute and return the done flags for the environment.
Returns:
A tuple containing the done flags for termination and time-out (keyed by the agent ID).
Shape of individual tensors is (num_envs,).
"""
raise NotImplementedError(f"Please implement the '_get_dones' method for {self.__class__.__name__}.")
def _set_debug_vis_impl(self, debug_vis: bool):
"""Set debug visualization into visualization objects.
This function is responsible for creating the visualization objects if they don't exist
and input ``debug_vis`` is True. If the visualization objects exist, the function should
set their visibility into the stage.
"""
raise NotImplementedError(f"Debug visualization is not implemented for {self.__class__.__name__}.")
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from dataclasses import MISSING
from omni.isaac.lab.scene import InteractiveSceneCfg
from omni.isaac.lab.sim import SimulationCfg
from omni.isaac.lab.utils import configclass
from omni.isaac.lab.utils.noise import NoiseModelCfg
from .common import AgentID, ViewerCfg
from .ui import BaseEnvWindow
@configclass
class DirectMARLEnvCfg:
"""Configuration for a MARL environment defined with the direct workflow.
Please refer to the :class:`omni.isaac.lab.envs.direct_marl_env.DirectMARLEnv` class for more details.
"""
# simulation settings
viewer: ViewerCfg = ViewerCfg()
"""Viewer configuration. Default is ViewerCfg()."""
sim: SimulationCfg = SimulationCfg()
"""Physics simulation configuration. Default is SimulationCfg()."""
# ui settings
ui_window_class_type: type | None = BaseEnvWindow
"""The class type of the UI window. Default is None.
If None, then no UI window is created.
Note:
If you want to make your own UI window, you can create a class that inherits from
from :class:`omni.isaac.lab.envs.ui.base_env_window.BaseEnvWindow`. Then, you can set
this attribute to your class type.
"""
# general settings
decimation: int = MISSING
"""Number of control action updates @ sim dt per policy dt.
For instance, if the simulation dt is 0.01s and the policy dt is 0.1s, then the decimation is 10.
This means that the control action is updated every 10 simulation steps.
"""
is_finite_horizon: bool = False
"""Whether the learning task is treated as a finite or infinite horizon problem for the agent.
Defaults to False, which means the task is treated as an infinite horizon problem.
This flag handles the subtleties of finite and infinite horizon tasks:
* **Finite horizon**: no penalty or bootstrapping value is required by the the agent for
running out of time. However, the environment still needs to terminate the episode after the
time limit is reached.
* **Infinite horizon**: the agent needs to bootstrap the value of the state at the end of the episode.
This is done by sending a time-limit (or truncated) done signal to the agent, which triggers this
bootstrapping calculation.
If True, then the environment is treated as a finite horizon problem and no time-out (or truncated) done signal
is sent to the agent. If False, then the environment is treated as an infinite horizon problem and a time-out
(or truncated) done signal is sent to the agent.
Note:
The base :class:`ManagerBasedRLEnv` class does not use this flag directly. It is used by the environment
wrappers to determine what type of done signal to send to the corresponding learning agent.
"""
episode_length_s: float = MISSING
"""Duration of an episode (in seconds).
Based on the decimation rate and physics time step, the episode length is calculated as:
.. code-block:: python
episode_length_steps = ceil(episode_length_s / (decimation_rate * physics_time_step))
For example, if the decimation rate is 10, the physics time step is 0.01, and the episode length is 10 seconds,
then the episode length in steps is 100.
"""
# environment settings
scene: InteractiveSceneCfg = MISSING
"""Scene settings.
Please refer to the :class:`omni.isaac.lab.scene.InteractiveSceneCfg` class for more details.
"""
events: object = None
"""Event settings. Defaults to None, in which case no events are applied through the event manager.
Please refer to the :class:`omni.isaac.lab.managers.EventManager` class for more details.
"""
num_observations: dict[AgentID, int] = MISSING
"""The dimension of the observation space from each agent."""
num_states: int = MISSING
"""The dimension of the state space from each environment instance.
The following values are supported:
* -1: All the observations from the different agents are automatically concatenated.
* 0: No state-space will be constructed (`state_space` is None).
This is useful to save computational resources when the algorithm to be trained does not need it.
* greater than 0: Custom state-space dimension to be provided by the task implementation.
"""
observation_noise_model: dict[AgentID, NoiseModelCfg | None] | None = None
"""The noise model to apply to the computed observations from the environment. Default is None, which means no noise is added.
Please refer to the :class:`omni.isaac.lab.utils.noise.NoiseModel` class for more details.
"""
num_actions: dict[AgentID, int] = MISSING
"""The dimension of the action space for each agent."""
action_noise_model: dict[AgentID, NoiseModelCfg | None] | None = None
"""The noise model applied to the actions provided to the environment. Default is None, which means no noise is added.
Please refer to the :class:`omni.isaac.lab.utils.noise.NoiseModel` class for more details.
"""
possible_agents: list[AgentID] = MISSING
"""A list of all possible agents the environment could generate.
The contents of the list cannot be modified during the entire training process.
"""
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import gymnasium as gym
import math
import numpy as np
import torch
from typing import Any
from .common import ActionType, AgentID, EnvStepReturn, ObsType, StateType, VecEnvObs, VecEnvStepReturn
from .direct_marl_env import DirectMARLEnv
from .direct_rl_env import DirectRLEnv
def multi_agent_to_single_agent(env: DirectMARLEnv, state_as_observation: bool = False) -> DirectRLEnv:
"""Convert the multi-agent environment instance to a single-agent environment instance.
The converted environment will be an instance of the single-agent environment interface class (:class:`DirectRLEnv`).
As part of the conversion process, the following operations are carried out:
* The observations of all the agents in the original multi-agent environment are concatenated to compose
the single-agent observation. If the use of the environment state is defined as the observation,
it is returned as is.
* The terminations and time-outs of all the agents in the original multi-agent environment are multiplied
(``AND`` operation) to compose the corresponding single-agent values.
* The rewards of all the agents in the original multi-agent environment are summed to compose the
single-agent reward.
* The action taken by the single-agent is split to compose the actions of each agent in the original
multi-agent environment before stepping it.
Args:
env: The environment to convert to.
state_as_observation: Weather to use the multi-agent environment state as single-agent observation.
Returns:
Single-agent environment instance.
Raises:
AssertionError: If the environment state cannot be used as observation since it was explicitly defined
as unconstructed (:attr:`DirectMARLEnvCfg.num_states`).
"""
class Env(DirectRLEnv):
def __init__(self, env: DirectMARLEnv) -> None:
self.env: DirectMARLEnv = env.unwrapped
# check if it is possible to use the multi-agent environment state as single-agent observation
self._state_as_observation = state_as_observation
if self._state_as_observation:
assert self.env.cfg.num_states != 0, (
"The environment state cannot be used as observation since it was explicitly defined as"
" unconstructed"
)
# create single-agent properties to expose in the converted environment
self.cfg = self.env.cfg
self.sim = self.env.sim
self.scene = self.env.scene
self.num_actions = sum(self.env.cfg.num_actions.values())
self.num_observations = sum(self.env.cfg.num_observations.values())
self.num_states = self.env.cfg.num_states
self.single_observation_space = gym.spaces.Dict()
if self._state_as_observation:
self.single_observation_space["policy"] = self.env.state_space
else:
self.single_observation_space["policy"] = gym.spaces.Box(
low=-np.inf, high=np.inf, shape=(self.num_observations,)
)
self.single_action_space = gym.spaces.Box(low=-np.inf, high=np.inf, shape=(self.num_actions,))
# batch the spaces for vectorized environments
self.observation_space = gym.vector.utils.batch_space(
self.single_observation_space["policy"], self.num_envs
)
self.action_space = gym.vector.utils.batch_space(self.single_action_space, self.num_envs)
def reset(self, seed: int | None = None, options: dict[str, Any] | None = None) -> tuple[VecEnvObs, dict]:
obs, extras = self.env.reset(seed, options)
# use environment state as observation
if self._state_as_observation:
obs = {"policy": self.env.state()}
# concatenate agents' observations
else:
obs = {"policy": torch.cat([obs[agent] for agent in self.env.possible_agents], dim=-1)}
return obs, extras
def step(self, action: torch.Tensor) -> VecEnvStepReturn:
# split single-agent actions to build the multi-agent ones
index = 0
_actions = {}
for agent in self.env.possible_agents:
_actions[agent] = action[:, index : index + self.env.cfg.num_actions[agent]]
index += self.env.cfg.num_actions[agent]
# step the environment
obs, rewards, terminated, time_outs, extras = self.env.step(_actions)
# use environment state as observation
if self._state_as_observation:
obs = {"policy": self.env.state()}
# concatenate agents' observations
else:
obs = {"policy": torch.cat([obs[agent] for agent in self.env.possible_agents], dim=-1)}
# process environment outputs to return single-agent data
rewards = sum(rewards.values())
terminated = math.prod(terminated.values()).to(dtype=torch.bool)
time_outs = math.prod(time_outs.values()).to(dtype=torch.bool)
return obs, rewards, terminated, time_outs, extras
def render(self, recompute: bool = False) -> np.ndarray | None:
self.env.render(recompute)
def close(self) -> None:
self.env.close()
return Env(env)
def multi_agent_with_one_agent(env: DirectMARLEnv, state_as_observation: bool = False) -> DirectMARLEnv:
"""Convert the multi-agent environment instance to a multi-agent environment instance with only one agent.
The converted environment will be an instance of the multi-agent environment interface class
(:class:`DirectMARLEnv`) but with only one agent available (with ID: ``"single-agent"``).
As part of the conversion process, the following operations are carried out:
* The observations of all the agents in the original multi-agent environment are concatenated to compose
the agent observation. If the use of the environment state is defined as the observation, it is returned as is.
* The terminations and time-outs of all the agents in the original multi-agent environment are multiplied
(``AND`` operation) to compose the corresponding agent values.
* The rewards of all the agents in the original multi-agent environment are summed to compose the agent reward.
* The action taken by the agent is split to compose the actions of each agent in the original
multi-agent environment before stepping it.
Args:
env: The environment to convert to.
state_as_observation: Weather to use the multi-agent environment state as agent observation.
Returns:
Multi-agent environment instance with only one agent.
Raises:
AssertionError: If the environment state cannot be used as observation since it was explicitly defined
as unconstructed (:attr:`DirectMARLEnvCfg.num_states`).
"""
class Env(DirectMARLEnv):
def __init__(self, env: DirectMARLEnv) -> None:
self.env: DirectMARLEnv = env.unwrapped
# check if it is possible to use the multi-agent environment state as agent observation
self._state_as_observation = state_as_observation
if self._state_as_observation:
assert self.env.cfg.num_states != 0, (
"The environment state cannot be used as observation since it was explicitly defined as"
" unconstructed"
)
# create agent properties to expose in the converted environment
self._agent_id = "single-agent"
self._exported_agents = [self._agent_id]
self._exported_possible_agents = [self._agent_id]
if self._state_as_observation:
self._exported_observation_spaces = {self._agent_id: self.env.state_space}
else:
self._exported_observation_spaces = {
self._agent_id: gym.spaces.Box(
low=-np.inf, high=np.inf, shape=(sum(self.env.cfg.num_observations.values()),)
)
}
self._exported_action_spaces = {
self._agent_id: gym.spaces.Box(
low=-np.inf, high=np.inf, shape=(sum(self.env.cfg.num_actions.values()),)
)
}
def __getattr__(self, key: str) -> Any:
return getattr(self.env, key)
@property
def agents(self) -> list[AgentID]:
return self._exported_agents
@property
def possible_agents(self) -> list[AgentID]:
return self._exported_possible_agents
@property
def observation_spaces(self) -> dict[AgentID, gym.Space]:
return self._exported_observation_spaces
@property
def action_spaces(self) -> dict[AgentID, gym.Space]:
return self._exported_action_spaces
def reset(
self, seed: int | None = None, options: dict[str, Any] | None = None
) -> tuple[dict[AgentID, ObsType], dict[AgentID, dict]]:
obs, extras = self.env.reset(seed, options)
# use environment state as observation
if self._state_as_observation:
obs = {self._agent_id: self.env.state()}
# concatenate agents' observations
else:
obs = {self._agent_id: torch.cat([obs[agent] for agent in self.env.possible_agents], dim=-1)}
return obs, extras
def step(self, actions: dict[AgentID, ActionType]) -> EnvStepReturn:
# split agent actions to build the multi-agent ones
index = 0
_actions = {}
for agent in self.env.possible_agents:
_actions[agent] = actions[self._agent_id][:, index : index + self.env.cfg.num_actions[agent]]
index += self.env.cfg.num_actions[agent]
# step the environment
obs, rewards, terminated, time_outs, extras = self.env.step(_actions)
# use environment state as observation
if self._state_as_observation:
obs = {self._agent_id: self.env.state()}
# concatenate agents' observations
else:
obs = {self._agent_id: torch.cat([obs[agent] for agent in self.env.possible_agents], dim=-1)}
# process environment outputs to return agent data
rewards = {self._agent_id: sum(rewards.values())}
terminated = {self._agent_id: math.prod(terminated.values()).to(dtype=torch.bool)}
time_outs = {self._agent_id: math.prod(time_outs.values()).to(dtype=torch.bool)}
return obs, rewards, terminated, time_outs, extras
def state(self) -> StateType | None:
return self.env.state()
def render(self, recompute: bool = False) -> np.ndarray | None:
self.env.render(recompute)
def close(self) -> None:
self.env.close()
return Env(env)
......@@ -25,7 +25,7 @@ INSTALL_REQUIRES = [
"toml",
# devices
"hidapi",
# gym
# reinforcement learning
"gymnasium==0.29.0",
# procedural-generation
"trimesh",
......
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
# ignore private usage of variables warning
# pyright: reportPrivateUsage=none
from __future__ import annotations
"""Launch Isaac Sim Simulator first."""
from omni.isaac.lab.app import AppLauncher, run_tests
# Can set this to False to see the GUI for debugging
HEADLESS = True
# launch omniverse app
app_launcher = AppLauncher(headless=HEADLESS)
simulation_app = app_launcher.app
"""Rest everything follows."""
import torch
import unittest
import omni.usd
from omni.isaac.lab.envs import DirectMARLEnv, DirectMARLEnvCfg
from omni.isaac.lab.scene import InteractiveSceneCfg
from omni.isaac.lab.utils import configclass
@configclass
class EmptySceneCfg(InteractiveSceneCfg):
"""Configuration for an empty scene."""
pass
def get_empty_base_env_cfg(device: str = "cuda:0", num_envs: int = 1, env_spacing: float = 1.0):
"""Generate base environment config based on device"""
@configclass
class EmptyEnvCfg(DirectMARLEnvCfg):
"""Configuration for the empty test environment."""
# Scene settings
scene: EmptySceneCfg = EmptySceneCfg(num_envs=num_envs, env_spacing=env_spacing)
# Basic settings
decimation = 1
possible_agents = ["agent_0", "agent_1"]
num_actions = {"agent_0": 1, "agent_1": 2}
num_observations = {"agent_0": 3, "agent_1": 4}
num_states = -1
return EmptyEnvCfg()
class TestDirectMARLEnv(unittest.TestCase):
"""Test for direct MARL env class"""
"""
Tests
"""
def test_initialization(self):
for device in ("cuda:0", "cpu"):
with self.subTest(device=device):
# create a new stage
omni.usd.get_context().new_stage()
# create environment
env = DirectMARLEnv(cfg=get_empty_base_env_cfg(device=device))
# check multi-agent config
self.assertEqual(env.num_agents, 2)
self.assertEqual(env.max_num_agents, 2)
# check spaces
self.assertEqual(env.state_space.shape, (7,))
self.assertEqual(len(env.observation_spaces), 2)
self.assertEqual(len(env.action_spaces), 2)
# step environment to verify setup
env.reset()
for _ in range(2):
actions = {"agent_0": torch.rand((1, 1)), "agent_1": torch.rand((1, 2))}
obs, reward, terminated, truncate, info = env.step(actions)
env.state()
# close the environment
env.close()
if __name__ == "__main__":
run_tests()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment