Commit 464631fa authored by CY Chen's avatar CY Chen Committed by Kelly Guo

Updates mimic to support multi-eef (DexMimicGen) data generation (#287)

This PR updates mimic to support multi-eef (DexMimicgen) data
generation.
It consists of the following major changes:
- Updated mimic code to support environments with multiple end effectors
- Added support for setting subtask constraints based on DexMimicGen
- Updated annotate_demos.py to support annotating subtask term signals
for multiple end effectors
- Updated mimic API target_eef_pose_to_action() to take noise as
dictionary of eef noise values instead of a single value

- New feature (non-breaking change which adds functionality)
- Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- This change requires a documentation update

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./isaaclab.sh --format`
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [x] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [x] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there
parent 78a70bb5
...@@ -26,13 +26,6 @@ parser.add_argument( ...@@ -26,13 +26,6 @@ parser.add_argument(
help="File name of the annotated output dataset file.", help="File name of the annotated output dataset file.",
) )
parser.add_argument("--auto", action="store_true", default=False, help="Automatically annotate subtasks.") parser.add_argument("--auto", action="store_true", default=False, help="Automatically annotate subtasks.")
parser.add_argument(
"--signals",
type=str,
nargs="+",
default=[],
help="Sequence of subtask termination signals for all except last subtask",
)
# append AppLauncher cli args # append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser) AppLauncher.add_app_launcher_args(parser)
...@@ -57,16 +50,17 @@ if not args_cli.headless and not os.environ.get("HEADLESS", 0): ...@@ -57,16 +50,17 @@ if not args_cli.headless and not os.environ.get("HEADLESS", 0):
from isaaclab.devices import Se3Keyboard from isaaclab.devices import Se3Keyboard
from isaaclab.envs import ManagerBasedRLMimicEnv from isaaclab.envs import ManagerBasedRLMimicEnv
from isaaclab.envs.mdp.recorders.recorders_cfg import ActionStateRecorderManagerCfg from isaaclab.envs.mdp.recorders.recorders_cfg import ActionStateRecorderManagerCfg
from isaaclab.managers import RecorderTerm, RecorderTermCfg from isaaclab.managers import RecorderTerm, RecorderTermCfg, TerminationTermCfg
from isaaclab.utils import configclass from isaaclab.utils import configclass
from isaaclab.utils.datasets import HDF5DatasetFileHandler from isaaclab.utils.datasets import EpisodeData, HDF5DatasetFileHandler
import isaaclab_tasks # noqa: F401 import isaaclab_tasks # noqa: F401
from isaaclab_tasks.utils.parse_cfg import parse_env_cfg from isaaclab_tasks.utils.parse_cfg import parse_env_cfg
is_paused = False is_paused = False
current_action_index = 0 current_action_index = 0
subtask_indices = [] marked_subtask_action_indices = []
skip_episode = False
def play_cb(): def play_cb():
...@@ -79,10 +73,15 @@ def pause_cb(): ...@@ -79,10 +73,15 @@ def pause_cb():
is_paused = True is_paused = True
def skip_episode_cb():
global skip_episode
skip_episode = True
def mark_subtask_cb(): def mark_subtask_cb():
global current_action_index, subtask_indices global current_action_index, marked_subtask_action_indices
subtask_indices.append(current_action_index) marked_subtask_action_indices.append(current_action_index)
print(f"Marked subtask at action index: {current_action_index}") print(f"Marked a subtask signal at action index: {current_action_index}")
class PreStepDatagenInfoRecorder(RecorderTerm): class PreStepDatagenInfoRecorder(RecorderTerm):
...@@ -91,7 +90,7 @@ class PreStepDatagenInfoRecorder(RecorderTerm): ...@@ -91,7 +90,7 @@ class PreStepDatagenInfoRecorder(RecorderTerm):
def record_pre_step(self): def record_pre_step(self):
eef_pose_dict = {} eef_pose_dict = {}
for eef_name in self._env.cfg.subtask_configs.keys(): for eef_name in self._env.cfg.subtask_configs.keys():
eef_pose_dict[eef_name] = self._env.get_robot_eef_pose(eef_name) eef_pose_dict[eef_name] = self._env.get_robot_eef_pose(eef_name=eef_name)
datagen_info = { datagen_info = {
"object_pose": self._env.get_object_poses(), "object_pose": self._env.get_object_poses(),
...@@ -132,11 +131,7 @@ class MimicRecorderManagerCfg(ActionStateRecorderManagerCfg): ...@@ -132,11 +131,7 @@ class MimicRecorderManagerCfg(ActionStateRecorderManagerCfg):
def main(): def main():
"""Add Isaac Lab Mimic annotations to the given demo dataset file.""" """Add Isaac Lab Mimic annotations to the given demo dataset file."""
global is_paused, current_action_index, subtask_indices global is_paused, current_action_index, marked_subtask_action_indices
if not args_cli.auto and len(args_cli.signals) == 0:
if len(args_cli.signals) == 0:
raise ValueError("Subtask signals should be provided for manual mode.")
# Load input dataset to be annotated # Load input dataset to be annotated
if not os.path.exists(args_cli.input_file): if not os.path.exists(args_cli.input_file):
...@@ -182,22 +177,32 @@ def main(): ...@@ -182,22 +177,32 @@ def main():
if not args_cli.auto: if not args_cli.auto:
# disable subtask term signals recorder term if in manual mode # disable subtask term signals recorder term if in manual mode
env_cfg.recorders.record_pre_step_subtask_term_signals = None env_cfg.recorders.record_pre_step_subtask_term_signals = None
env_cfg.recorders.dataset_export_dir_path = output_dir env_cfg.recorders.dataset_export_dir_path = output_dir
env_cfg.recorders.dataset_filename = output_file_name env_cfg.recorders.dataset_filename = output_file_name
# create environment from loaded config # create environment from loaded config
env = gym.make(args_cli.task, cfg=env_cfg).unwrapped env: ManagerBasedRLMimicEnv = gym.make(args_cli.task, cfg=env_cfg).unwrapped
if not isinstance(env.unwrapped, ManagerBasedRLMimicEnv): if not isinstance(env, ManagerBasedRLMimicEnv):
raise ValueError("The environment should be derived from ManagerBasedRLMimicEnv") raise ValueError("The environment should be derived from ManagerBasedRLMimicEnv")
if args_cli.auto: if args_cli.auto:
# check if the mimic API env.unwrapped.get_subtask_term_signals() is implemented # check if the mimic API env.get_subtask_term_signals() is implemented
if env.unwrapped.get_subtask_term_signals.__func__ is ManagerBasedRLMimicEnv.get_subtask_term_signals: if env.get_subtask_term_signals.__func__ is ManagerBasedRLMimicEnv.get_subtask_term_signals:
raise NotImplementedError( raise NotImplementedError(
"The environment does not implement the get_subtask_term_signals method required " "The environment does not implement the get_subtask_term_signals method required "
"to run automatic annotations." "to run automatic annotations."
) )
else:
# get subtask termination signal names for each eef from the environment configs
subtask_term_signal_names = {}
for eef_name, eef_subtask_configs in env.cfg.subtask_configs.items():
subtask_term_signal_names[eef_name] = [
subtask_config.subtask_term_signal for subtask_config in eef_subtask_configs
]
# no need to annotate the last subtask term signal, so remove it from the list
subtask_term_signal_names[eef_name].pop()
# reset environment # reset environment
env.reset() env.reset()
...@@ -207,6 +212,7 @@ def main(): ...@@ -207,6 +212,7 @@ def main():
keyboard_interface = Se3Keyboard(pos_sensitivity=0.1, rot_sensitivity=0.1) keyboard_interface = Se3Keyboard(pos_sensitivity=0.1, rot_sensitivity=0.1)
keyboard_interface.add_callback("N", play_cb) keyboard_interface.add_callback("N", play_cb)
keyboard_interface.add_callback("B", pause_cb) keyboard_interface.add_callback("B", pause_cb)
keyboard_interface.add_callback("Q", skip_episode_cb)
if not args_cli.auto: if not args_cli.auto:
keyboard_interface.add_callback("S", mark_subtask_cb) keyboard_interface.add_callback("S", mark_subtask_cb)
keyboard_interface.reset() keyboard_interface.reset()
...@@ -219,81 +225,199 @@ def main(): ...@@ -219,81 +225,199 @@ def main():
# Iterate over the episodes in the loaded dataset file # Iterate over the episodes in the loaded dataset file
for episode_index, episode_name in enumerate(dataset_file_handler.get_episode_names()): for episode_index, episode_name in enumerate(dataset_file_handler.get_episode_names()):
processed_episode_count += 1 processed_episode_count += 1
subtask_indices = []
print(f"\nAnnotating episode #{episode_index} ({episode_name})") print(f"\nAnnotating episode #{episode_index} ({episode_name})")
episode = dataset_file_handler.load_episode(episode_name, env.unwrapped.device) episode = dataset_file_handler.load_episode(episode_name, env.device)
episode_data = episode.data
is_episode_annotated_successfully = False
if args_cli.auto:
is_episode_annotated_successfully = annotate_episode_in_auto_mode(env, episode, success_term)
else:
is_episode_annotated_successfully = annotate_episode_in_manual_mode(
env, episode, success_term, subtask_term_signal_names
)
if is_episode_annotated_successfully and not skip_episode:
# set success to the recorded episode data and export to file
env.recorder_manager.set_success_to_episodes(
None, torch.tensor([[True]], dtype=torch.bool, device=env.device)
)
env.recorder_manager.export_episodes()
exported_episode_count += 1
print("\tExported the annotated episode.")
else:
print("\tSkipped exporting the episode due to incomplete subtask annotations.")
break
print(
f"\nExported {exported_episode_count} (out of {processed_episode_count}) annotated"
f" episode{'s' if exported_episode_count > 1 else ''}."
)
print("Exiting the app.")
# Close environment after annotation is complete
env.close()
# read initial state from the loaded episode
initial_state = episode_data["initial_state"]
env.unwrapped.recorder_manager.reset()
env.unwrapped.reset_to(initial_state, None, is_relative=True)
# replay actions from this episode def replay_episode(
actions = episode_data["actions"] env: ManagerBasedRLMimicEnv,
episode: EpisodeData,
success_term: TerminationTermCfg | None = None,
) -> bool:
"""Replays an episode in the environment.
This function replays the given recorded episode in the environment. It can optionally check if the task
was successfully completed using a success termination condition input.
Args:
env: The environment to replay the episode in.
episode: The recorded episode data to replay.
success_term: Optional termination term to check for task success.
Returns:
True if the episode was successfully replayed and the success condition was met (if provided),
False otherwise.
"""
global current_action_index, skip_episode, is_paused
# read initial state and actions from the loaded episode
initial_state = episode.data["initial_state"]
actions = episode.data["actions"]
env.recorder_manager.reset()
env.reset_to(initial_state, None, is_relative=True)
first_action = True first_action = True
for action_index, action in enumerate(actions): for action_index, action in enumerate(actions):
current_action_index = action_index current_action_index = action_index
if first_action: if first_action:
first_action = False first_action = False
else: else:
while is_paused: while is_paused or skip_episode:
env.unwrapped.sim.render() env.sim.render()
if skip_episode:
return False
continue continue
action_tensor = torch.Tensor(action).reshape([1, action.shape[0]]) action_tensor = torch.Tensor(action).reshape([1, action.shape[0]])
env.step(torch.Tensor(action_tensor)) env.step(torch.Tensor(action_tensor))
if success_term is not None:
is_episode_annotated_successfully = False if not bool(success_term.func(env, **success_term.params)[0]):
if not args_cli.auto: return False
print(f"\tSubtasks marked at action indices: {subtask_indices}") return True
if len(args_cli.signals) != len(subtask_indices):
raise ValueError(
f"Number of annotated subtask signals {len(subtask_indices)} should be equal " def annotate_episode_in_auto_mode(
f" to number of subtasks {len(args_cli.signals)}" env: ManagerBasedRLMimicEnv,
) episode: EpisodeData,
annotated_episode = env.unwrapped.recorder_manager.get_episode(0) success_term: TerminationTermCfg | None = None,
for subtask_index in range(len(args_cli.signals)): ) -> bool:
# subtask termination signal is false until subtask is complete, and true afterwards """Annotates an episode in automatic mode.
subtask_signals = torch.ones(len(actions), dtype=torch.bool)
subtask_signals[: subtask_indices[subtask_index]] = False This function replays the given episode in the environment and checks if the task was successfully completed.
annotated_episode.add( If the task was not completed, it will print a message and return False. Otherwise, it will check if all the
f"obs/datagen_info/subtask_term_signals/{args_cli.signals[subtask_index]}", subtask_signals subtask term signals are annotated and return True if they are, False otherwise.
)
is_episode_annotated_successfully = True Args:
env: The environment to replay the episode in.
episode: The recorded episode data to replay.
success_term: Optional termination term to check for task success.
Returns:
True if the episode was successfully annotated, False otherwise.
"""
global skip_episode
skip_episode = False
is_episode_annotated_successfully = replay_episode(env, episode, success_term)
if skip_episode:
print("\tSkipping the episode.")
return False
if not is_episode_annotated_successfully:
print("\tThe final task was not completed.")
else: else:
# check if all the subtask term signals are annotated # check if all the subtask term signals are annotated
annotated_episode = env.unwrapped.recorder_manager.get_episode(0) annotated_episode = env.recorder_manager.get_episode(0)
subtask_term_signal_dict = annotated_episode.data["obs"]["datagen_info"]["subtask_term_signals"] subtask_term_signal_dict = annotated_episode.data["obs"]["datagen_info"]["subtask_term_signals"]
is_episode_annotated_successfully = True
for signal_name, signal_flags in subtask_term_signal_dict.items(): for signal_name, signal_flags in subtask_term_signal_dict.items():
if not torch.any(signal_flags): if not torch.any(signal_flags):
is_episode_annotated_successfully = False is_episode_annotated_successfully = False
print(f'\tDid not detect completion for the subtask "{signal_name}".') print(f'\tDid not detect completion for the subtask "{signal_name}".')
return is_episode_annotated_successfully
def annotate_episode_in_manual_mode(
env: ManagerBasedRLMimicEnv,
episode: EpisodeData,
success_term: TerminationTermCfg | None = None,
subtask_term_signal_names: dict[str, list[str]] = {},
) -> bool:
"""Annotates an episode in manual mode.
This function replays the given episode in the environment and allows for manual marking of subtask term signals.
It iterates over each eef and prompts the user to mark the subtask term signals for that eef.
Args:
env: The environment to replay the episode in.
episode: The recorded episode data to replay.
success_term: Optional termination term to check for task success.
subtask_term_signal_names: Dictionary mapping eef names to lists of subtask term signal names.
Returns:
True if the episode was successfully annotated, False otherwise.
"""
global is_paused, marked_subtask_action_indices, skip_episode
# iterate over the eefs for marking subtask term signals
subtask_term_signal_action_indices = {}
for eef_name, eef_subtask_term_signal_names in subtask_term_signal_names.items():
# skip if no subtask annotation is needed for this eef
if len(eef_subtask_term_signal_names) == 0:
continue
if not bool(success_term.func(env, **success_term.params)[0]): while True:
is_episode_annotated_successfully = False is_paused = True
print("\tThe final task was not completed.") skip_episode = False
print(f'\tPlaying the episode for subtask annotations for eef "{eef_name}".')
if is_episode_annotated_successfully: print("\tSubtask signals to annotate:")
# set success to the recorded episode data and export to file print(f"\t\t- Termination:\t{eef_subtask_term_signal_names}")
env.unwrapped.recorder_manager.set_success_to_episodes(
None, torch.tensor([[True]], dtype=torch.bool, device=env.unwrapped.device) print('\n\tPress "N" to begin.')
print('\tPress "B" to pause.')
print('\tPress "S" to annotate subtask signals.')
print('\tPress "Q" to skip the episode.\n')
marked_subtask_action_indices = []
task_success_result = replay_episode(env, episode, success_term)
if skip_episode:
print("\tSkipping the episode.")
return False
print(f"\tSubtasks marked at action indices: {marked_subtask_action_indices}")
expected_subtask_signal_count = len(eef_subtask_term_signal_names)
if task_success_result and expected_subtask_signal_count == len(marked_subtask_action_indices):
print(f'\tAll {expected_subtask_signal_count} subtask signals for eef "{eef_name}" were annotated.')
for marked_signal_index in range(expected_subtask_signal_count):
# collect subtask term signal action indices
subtask_term_signal_action_indices[eef_subtask_term_signal_names[marked_signal_index]] = (
marked_subtask_action_indices[marked_signal_index]
) )
env.unwrapped.recorder_manager.export_episodes()
exported_episode_count += 1
print("\tExported the annotated episode.")
else:
print("\tSkipped exporting the episode due to incomplete subtask annotations.")
break break
if not task_success_result:
print("\tThe final task was not completed.")
if expected_subtask_signal_count != len(marked_subtask_action_indices):
print( print(
f"\nExported {exported_episode_count} (out of {processed_episode_count}) annotated" f"\tOnly {len(marked_subtask_action_indices)} out of"
f" episode{'s' if exported_episode_count > 1 else ''}." f' {expected_subtask_signal_count} subtask signals for eef "{eef_name}" were'
" annotated."
) )
print("Exiting the app.")
# Close environment after annotation is complete print(f'\tThe episode will be replayed again for re-marking subtask signals for the eef "{eef_name}".\n')
env.close()
annotated_episode = env.recorder_manager.get_episode(0)
for (
subtask_term_signal_name,
subtask_term_signal_action_index,
) in subtask_term_signal_action_indices.items():
# subtask termination signal is false until subtask is complete, and true afterwards
subtask_signals = torch.ones(len(episode.data["actions"]), dtype=torch.bool)
subtask_signals[:subtask_term_signal_action_index] = False
annotated_episode.add(f"obs/datagen_info/subtask_term_signals/{subtask_term_signal_name}", subtask_signals)
return True
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
Main data generation script. Main data generation script.
""" """
# Launching Isaac Sim Simulator first. """Launch Isaac Sim Simulator first."""
import argparse import argparse
...@@ -45,10 +45,15 @@ simulation_app = app_launcher.app ...@@ -45,10 +45,15 @@ simulation_app = app_launcher.app
import asyncio import asyncio
import gymnasium as gym import gymnasium as gym
import inspect
import numpy as np import numpy as np
import random import random
import torch import torch
import omni
from isaaclab.envs import ManagerBasedRLMimicEnv
import isaaclab_mimic.envs # noqa: F401 import isaaclab_mimic.envs # noqa: F401
from isaaclab_mimic.datagen.generation import env_loop, setup_async_generation, setup_env_config from isaaclab_mimic.datagen.generation import env_loop, setup_async_generation, setup_env_config
from isaaclab_mimic.datagen.utils import get_env_name_from_dataset, setup_output_paths from isaaclab_mimic.datagen.utils import get_env_name_from_dataset, setup_output_paths
...@@ -74,12 +79,22 @@ def main(): ...@@ -74,12 +79,22 @@ def main():
) )
# create environment # create environment
env = gym.make(env_name, cfg=env_cfg) env = gym.make(env_name, cfg=env_cfg).unwrapped
if not isinstance(env, ManagerBasedRLMimicEnv):
raise ValueError("The environment should be derived from ManagerBasedRLMimicEnv")
# check if the mimic API from this environment contains decprecated signatures
if "action_noise_dict" not in inspect.signature(env.target_eef_pose_to_action).parameters:
omni.log.warn(
f'The "noise" parameter in the "{env_name}" environment\'s mimic API "target_eef_pose_to_action", '
"is deprecated. Please update the API to take action_noise_dict instead."
)
# set seed for generation # set seed for generation
random.seed(env.unwrapped.cfg.datagen_config.seed) random.seed(env.cfg.datagen_config.seed)
np.random.seed(env.unwrapped.cfg.datagen_config.seed) np.random.seed(env.cfg.datagen_config.seed)
torch.manual_seed(env.unwrapped.cfg.datagen_config.seed) torch.manual_seed(env.cfg.datagen_config.seed)
# reset before starting # reset before starting
env.reset() env.reset()
...@@ -95,7 +110,13 @@ def main(): ...@@ -95,7 +110,13 @@ def main():
try: try:
asyncio.ensure_future(asyncio.gather(*async_components["tasks"])) asyncio.ensure_future(asyncio.gather(*async_components["tasks"]))
env_loop(env, async_components["action_queue"], async_components["info_pool"], async_components["event_loop"]) env_loop(
env,
async_components["reset_queue"],
async_components["action_queue"],
async_components["info_pool"],
async_components["event_loop"],
)
except asyncio.CancelledError: except asyncio.CancelledError:
print("Tasks were cancelled.") print("Tasks were cancelled.")
......
...@@ -118,6 +118,18 @@ Changed ...@@ -118,6 +118,18 @@ Changed
* ``set_fixed_tendon_limit`` → ``set_fixed_tendon_position_limit`` * ``set_fixed_tendon_limit`` → ``set_fixed_tendon_position_limit``
0.34.12 (2025-03-06)
~~~~~~~~~~~~~~~~~~~~
Added
^^^^^
* Updated the mimic API :meth:`target_eef_pose_to_action` in :class:`isaaclab.envs.ManagerBasedRLMimicEnv` to take a dictionary of
eef noise values instead of a single noise value.
* Added support for optional subtask constraints based on DexMimicGen to the mimic configuration class :class:`isaaclab.envs.MimicEnvCfg`.
* Enabled data compression in HDF5 dataset file handler :class:`isaaclab.utils.datasets.hdf5_dataset_file_handler.HDF5DatasetFileHandler`.
0.34.11 (2025-03-04) 0.34.11 (2025-03-04)
~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~
......
...@@ -47,7 +47,11 @@ class ManagerBasedRLMimicEnv(ManagerBasedRLEnv): ...@@ -47,7 +47,11 @@ class ManagerBasedRLMimicEnv(ManagerBasedRLEnv):
raise NotImplementedError raise NotImplementedError
def target_eef_pose_to_action( def target_eef_pose_to_action(
self, target_eef_pose_dict: dict, gripper_action_dict: dict, noise: float | None = None, env_id: int = 0 self,
target_eef_pose_dict: dict,
gripper_action_dict: dict,
action_noise_dict: dict | None = None,
env_id: int = 0,
) -> torch.Tensor: ) -> torch.Tensor:
""" """
Takes a target pose and gripper action for the end effector controller and returns an action Takes a target pose and gripper action for the end effector controller and returns an action
...@@ -57,7 +61,7 @@ class ManagerBasedRLMimicEnv(ManagerBasedRLEnv): ...@@ -57,7 +61,7 @@ class ManagerBasedRLMimicEnv(ManagerBasedRLEnv):
Args: Args:
target_eef_pose_dict: Dictionary of 4x4 target eef pose for each end-effector. target_eef_pose_dict: Dictionary of 4x4 target eef pose for each end-effector.
gripper_action_dict: Dictionary of gripper actions for each end-effector. gripper_action_dict: Dictionary of gripper actions for each end-effector.
noise: Noise to add to the action. If None, no noise is added. action_noise_dict: Noise to add to the action. If None, no noise is added.
env_id: Environment index to compute the action for. env_id: Environment index to compute the action for.
Returns: Returns:
......
...@@ -10,6 +10,8 @@ ...@@ -10,6 +10,8 @@
""" """
Base MimicEnvCfg object for Isaac Lab Mimic data generation. Base MimicEnvCfg object for Isaac Lab Mimic data generation.
""" """
import enum
from isaaclab.utils import configclass from isaaclab.utils import configclass
...@@ -17,119 +19,257 @@ from isaaclab.utils import configclass ...@@ -17,119 +19,257 @@ from isaaclab.utils import configclass
class DataGenConfig: class DataGenConfig:
"""Configuration settings for data generation processes within the Isaac Lab Mimic environment.""" """Configuration settings for data generation processes within the Isaac Lab Mimic environment."""
# The name of the datageneration, default is "demo"
name: str = "demo" name: str = "demo"
"""The name of the data generation process. Defaults to "demo"."""
# If set to True, generation will be retried until
# generation_num_trials successful demos have been generated.
# If set to False, generation will stop after generation_num_trails,
# independent of whether they were all successful or not.
generation_guarantee: bool = True generation_guarantee: bool = True
"""Whether to retry generation until generation_num_trials successful demos have been generated.
############################################################## If True, generation will be retried until generation_num_trials successful demos are created.
# Debugging parameters, which can help determining low success If False, generation will stop after generation_num_trails, regardless of success.
# rates. """
# Whether to keep failed generation trials. Keeping failed
# demonstrations is useful for visualizing and debugging low
# success rates.
generation_keep_failed: bool = False generation_keep_failed: bool = False
"""Whether to keep failed generation trials.
Keeping failed demonstrations is useful for visualizing and debugging low success rates.
"""
# Maximum number of failures allowed before stopping generation
max_num_failures: int = 50 max_num_failures: int = 50
"""Maximum number of failures allowed before stopping generation."""
# Seed for randomization to ensure reproducibility
seed: int = 1 seed: int = 1
"""Seed for randomization to ensure reproducibility."""
############################################################## """The following configuration values can be changed on the command line, and only serve as defaults."""
# The following values can be changed on the command line, and
# only serve as defaults.
# Path to the source dataset for mimic generation
source_dataset_path: str = None source_dataset_path: str = None
"""Path to the source dataset for mimic generation."""
# Path where the generated data will be saved
generation_path: str = None generation_path: str = None
"""Path where the generated data will be saved."""
# Number of trial to be generated
generation_num_trials: int = 10 generation_num_trials: int = 10
"""Number of trials to be generated."""
# Name of the task being configured
task_name: str = None task_name: str = None
"""Name of the task being configured."""
############################################################## """The following configurations are advanced and do not usually need to be changed."""
# Advanced configuration, does not usually need to be changed
# Whether to select source data per subtask
# Note: this requires subtasks to be properly temporally
# constrained, and may require additional subtasks to allow
# for time synchronization.
generation_select_src_per_subtask: bool = False generation_select_src_per_subtask: bool = False
"""Whether to select source data per subtask.
Note:
This requires subtasks to be properly temporally constrained, and may require
additional subtasks to allow for time synchronization.
"""
generation_select_src_per_arm: bool = False
"""Whether to select source data per arm."""
# Whether to transform the first robot pose during generation
generation_transform_first_robot_pose: bool = False generation_transform_first_robot_pose: bool = False
"""Whether to transform the first robot pose during generation."""
# Whether to interpolate from last target pose
generation_interpolate_from_last_target_pose: bool = True generation_interpolate_from_last_target_pose: bool = True
"""Whether to interpolate from last target pose."""
@configclass @configclass
class SubTaskConfig: class SubTaskConfig:
""" """Configuration settings specific to the management of individual subtasks."""
Configuration settings specific to the management of individual
subtasks.
"""
############################################################## """Mandatory options that should be defined for every subtask."""
# Mandatory options that should be defined for every subtask
# Reference to the object involved in this subtask, None if no
# object is involved (this is rarely the case).
object_ref: str = None object_ref: str = None
"""Reference to the object involved in this subtask.
Set to None if no object is involved (this is rarely the case).
"""
# Signal for subtask termination
subtask_term_signal: str = None subtask_term_signal: str = None
"""Subtask termination signal name."""
"""Advanced options for tuning the generation results."""
##############################################################
# Advanced options for tuning the generation results
# Strategy on how to select a subtask segment. Can be either
# 'random', 'nearest_neighbor_object' or
# 'nearest_neighbor_robot_distance'. Details can be found in
# source/isaaclab_mimic/isaaclab_mimic/datagen/selection_strategy.py
#
# Note: for 'nearest_neighbor_object' and
# 'nearest_neighbor_robot_distance', the subtask needs to have
# 'object_ref' set to a value other than 'None' above. At the
# same time, if 'object_ref' is not 'None', then either of
# those strategies will usually yield higher success rates
# than the default 'random' strategy.
selection_strategy: str = "random" selection_strategy: str = "random"
"""Strategy for selecting a subtask segment.
Can be one of:
* 'random'
* 'nearest_neighbor_object'
* 'nearest_neighbor_robot_distance'
Note:
For 'nearest_neighbor_object' and 'nearest_neighbor_robot_distance', the subtask needs
to have 'object_ref' set to a value other than 'None'. These strategies typically yield
higher success rates than the default 'random' strategy when object_ref is set.
"""
# Additional arguments to the selected strategy. See details on
# each strategy in
# source/isaaclab_mimic/isaaclab_mimic/datagen/selection_strategy.py
# Arguments will be passed through to the `select_source_demo`
# method.
selection_strategy_kwargs: dict = {} selection_strategy_kwargs: dict = {}
"""Additional arguments to the selected strategy. See details on each strategy in
source/isaaclab_mimic/isaaclab_mimic/datagen/selection_strategy.py
Arguments will be passed through to the `select_source_demo` method."""
# Range for start offset of the first subtask
first_subtask_start_offset_range: tuple = (0, 0) first_subtask_start_offset_range: tuple = (0, 0)
"""Range for start offset of the first subtask."""
# Range for offsetting subtask termination
subtask_term_offset_range: tuple = (0, 0) subtask_term_offset_range: tuple = (0, 0)
"""Range for offsetting subtask termination."""
# Amplitude of action noise applied
action_noise: float = 0.03 action_noise: float = 0.03
"""Amplitude of action noise applied."""
# Number of steps for interpolation between waypoints
num_interpolation_steps: int = 5 num_interpolation_steps: int = 5
"""Number of steps for interpolation between waypoints."""
# Number of fixed steps for the subtask
num_fixed_steps: int = 0 num_fixed_steps: int = 0
"""Number of fixed steps for the subtask."""
# Whether to apply noise during interpolation
apply_noise_during_interpolation: bool = False apply_noise_during_interpolation: bool = False
"""Whether to apply noise during interpolation."""
class SubTaskConstraintType(enum.IntEnum):
"""Enum for subtask constraint types."""
SEQUENTIAL = 0
COORDINATION = 1
_SEQUENTIAL_FORMER = 2
_SEQUENTIAL_LATTER = 3
class SubTaskConstraintCoordinationScheme(enum.IntEnum):
"""Enum for coordination schemes."""
REPLAY = 0
TRANSFORM = 1
TRANSLATE = 2
@configclass
class SubTaskConstraintConfig:
"""Configuration settings for subtask constraints."""
eef_subtask_constraint_tuple: list[tuple[str, int]] = (("", 0), ("", 0))
"""List of associated subtasks tuples in order.
The first element of the tuple refers to the eef name.
The second element of the tuple refers to the subtask index of the eef.
"""
constraint_type: SubTaskConstraintType = None
"""Type of constraint to apply between subtasks."""
sequential_min_time_diff: int = -1
"""Minimum time difference between two sequential subtasks finishing.
The second subtask will execute until sequential_min_time_diff steps left in its subtask trajectory
and wait until the first (preconditioned) subtask is finished to continue executing the rest.
If set to -1, the second subtask will start only after the first subtask is finished.
"""
coordination_scheme: SubTaskConstraintCoordinationScheme = SubTaskConstraintCoordinationScheme.REPLAY
"""Scheme to use for coordinating subtasks."""
coordination_scheme_pos_noise_scale: float = 0.0
"""Scale of position noise to apply during coordination."""
coordination_scheme_rot_noise_scale: float = 0.0
"""Scale of rotation noise to apply during coordination."""
coordination_synchronize_start: bool = False
"""Whether subtasks should start at the same time."""
def generate_runtime_subtask_constraints(self):
"""
Populate expanded task constraints dictionary based on the task constraint config.
The task constraint config contains the configurations set by the user. While the
task_constraints_dict contains flags used to implement the constraint logic in this class.
The task_constraint_configs may include the following types:
- "sequential"
- "coordination"
For a "sequential" constraint:
- Data from task_constraint_configs is added to task_constraints_dict as "sequential former" task constraint.
- The opposite constraint, of type "sequential latter", is also added to task_constraints_dict.
- Additionally, a ("fulfilled", Bool) key-value pair is added to task_constraints_dict.
- This is used to check if the precondition (i.e., the sequential former task) has been met.
- Until the "fulfilled" flag in "sequential latter" is set by "sequential former",
the "sequential latter" subtask will remain paused.
For a "coordination" constraint:
- Data from task_constraint_configs is added to task_constraints_dict.
- The opposite constraint, of type "coordination", is also added to task_constraints_dict.
- The number of synchronous steps is set to the minimum of subtask_len and concurrent_subtask_len.
- This ensures both concurrent tasks end at the same time step.
- A "selected_src_demo_ind" and "transform" field are used to ensure the transforms used by both subtasks are the same.
"""
task_constraints_dict = dict()
if self.constraint_type == SubTaskConstraintType.SEQUENTIAL:
constrained_task_spec_key, constrained_subtask_ind = self.eef_subtask_constraint_tuple[1]
assert isinstance(constrained_subtask_ind, int)
pre_condition_task_spec_key, pre_condition_subtask_ind = self.eef_subtask_constraint_tuple[0]
assert isinstance(pre_condition_subtask_ind, int)
assert (
constrained_task_spec_key,
constrained_subtask_ind,
) not in task_constraints_dict, "only one constraint per subtask allowed"
task_constraints_dict[(constrained_task_spec_key, constrained_subtask_ind)] = dict(
type=SubTaskConstraintType._SEQUENTIAL_LATTER,
pre_condition_task_spec_key=pre_condition_task_spec_key,
pre_condition_subtask_ind=pre_condition_subtask_ind,
min_time_diff=self.sequential_min_time_diff,
fulfilled=False,
)
task_constraints_dict[(pre_condition_task_spec_key, pre_condition_subtask_ind)] = dict(
type=SubTaskConstraintType._SEQUENTIAL_FORMER,
constrained_task_spec_key=constrained_task_spec_key,
constrained_subtask_ind=constrained_subtask_ind,
)
elif self.constraint_type == SubTaskConstraintType.COORDINATION:
constrained_task_spec_key, constrained_subtask_ind = self.eef_subtask_constraint_tuple[0]
assert isinstance(constrained_subtask_ind, int)
concurrent_task_spec_key, concurrent_subtask_ind = self.eef_subtask_constraint_tuple[1]
assert isinstance(concurrent_subtask_ind, int)
if self.coordination_scheme is None:
raise ValueError("Coordination scheme must be specified.")
assert (
constrained_task_spec_key,
constrained_subtask_ind,
) not in task_constraints_dict, "only one constraint per subtask allowed"
task_constraints_dict[(constrained_task_spec_key, constrained_subtask_ind)] = dict(
concurrent_task_spec_key=concurrent_task_spec_key,
concurrent_subtask_ind=concurrent_subtask_ind,
type=SubTaskConstraintType.COORDINATION,
fulfilled=False,
finished=False,
selected_src_demo_ind=None,
coordination_scheme=self.coordination_scheme,
coordination_scheme_pos_noise_scale=self.coordination_scheme_pos_noise_scale,
coordination_scheme_rot_noise_scale=self.coordination_scheme_rot_noise_scale,
coordination_synchronize_start=self.coordination_synchronize_start,
synchronous_steps=None, # to be calculated at runtime
)
task_constraints_dict[(concurrent_task_spec_key, concurrent_subtask_ind)] = dict(
concurrent_task_spec_key=constrained_task_spec_key,
concurrent_subtask_ind=constrained_subtask_ind,
type=SubTaskConstraintType.COORDINATION,
fulfilled=False,
finished=False,
selected_src_demo_ind=None,
coordination_scheme=self.coordination_scheme,
coordination_scheme_pos_noise_scale=self.coordination_scheme_pos_noise_scale,
coordination_scheme_rot_noise_scale=self.coordination_scheme_rot_noise_scale,
coordination_synchronize_start=self.coordination_synchronize_start,
synchronous_steps=None, # to be calculated at runtime
)
else:
raise ValueError("Constraint type not supported.")
return task_constraints_dict
@configclass @configclass
...@@ -146,6 +286,7 @@ class MimicEnvCfg: ...@@ -146,6 +286,7 @@ class MimicEnvCfg:
# Dictionary of list of subtask configurations for each end-effector. # Dictionary of list of subtask configurations for each end-effector.
# Keys are end-effector names. # Keys are end-effector names.
# Currently, only a single end-effector is supported by Isaac Lab Mimic
# so `subtask_configs` must always be of size 1.
subtask_configs: dict[str, list[SubTaskConfig]] = {} subtask_configs: dict[str, list[SubTaskConfig]] = {}
# List of configurations for subtask constraints
task_constraint_configs: list[SubTaskConstraintConfig] = []
...@@ -163,7 +163,7 @@ class HDF5DatasetFileHandler(DatasetFileHandlerBase): ...@@ -163,7 +163,7 @@ class HDF5DatasetFileHandler(DatasetFileHandlerBase):
for sub_key, sub_value in value.items(): for sub_key, sub_value in value.items():
create_dataset_helper(key_group, sub_key, sub_value) create_dataset_helper(key_group, sub_key, sub_value)
else: else:
group.create_dataset(key, data=value.cpu().numpy()) group.create_dataset(key, data=value.cpu().numpy(), compression="gzip")
for key, value in episode.data.items(): for key, value in episode.data.items():
create_dataset_helper(h5_episode_group, key, value) create_dataset_helper(h5_episode_group, key, value)
......
[package] [package]
# Semantic Versioning is used: https://semver.org/ # Semantic Versioning is used: https://semver.org/
version = "1.0.4" version = "1.0.5"
# Description # Description
category = "isaaclab" category = "isaaclab"
......
Changelog Changelog
--------- ---------
1.0.4 (2025-03-10) 1.0.5 (2025-03-10)
~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~
Changed Changed
...@@ -15,6 +15,16 @@ Added ...@@ -15,6 +15,16 @@ Added
* Added ``Isaac-Stack-Cube-Franka-IK-Rel-Blueprint-Mimic-v0`` environment for blueprint vision stacking. * Added ``Isaac-Stack-Cube-Franka-IK-Rel-Blueprint-Mimic-v0`` environment for blueprint vision stacking.
1.0.4 (2025-03-07)
~~~~~~~~~~~~~~~~~~
Changed
^^^^^^^
* Updated data generator to support environments with multiple end effectors.
* Updated data generator to support subtask constraints based on DexMimicGen.
1.0.3 (2025-03-06) 1.0.3 (2025-03-06)
~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~
......
...@@ -11,50 +11,158 @@ import numpy as np ...@@ -11,50 +11,158 @@ import numpy as np
import torch import torch
import isaaclab.utils.math as PoseUtils import isaaclab.utils.math as PoseUtils
from isaaclab.envs.mimic_env_cfg import MimicEnvCfg from isaaclab.envs import (
ManagerBasedRLMimicEnv,
MimicEnvCfg,
SubTaskConstraintCoordinationScheme,
SubTaskConstraintType,
)
from isaaclab.managers import TerminationTermCfg
from isaaclab_mimic.datagen.datagen_info import DatagenInfo from isaaclab_mimic.datagen.datagen_info import DatagenInfo
from isaaclab_mimic.datagen.selection_strategy import make_selection_strategy from isaaclab_mimic.datagen.selection_strategy import make_selection_strategy
from isaaclab_mimic.datagen.waypoint import WaypointSequence, WaypointTrajectory from isaaclab_mimic.datagen.waypoint import MultiWaypoint, Waypoint, WaypointSequence, WaypointTrajectory
from .datagen_info_pool import DataGenInfoPool from .datagen_info_pool import DataGenInfoPool
def transform_source_data_segment_using_delta_object_pose(
src_eef_poses: torch.Tensor,
delta_obj_pose: torch.Tensor,
) -> torch.Tensor:
"""
Transform a source data segment (object-centric subtask segment from source demonstration) using
a delta object pose.
Args:
src_eef_poses: pose sequence (shape [T, 4, 4]) for the sequence of end effector control poses
from the source demonstration
delta_obj_pose: 4x4 delta object pose
Returns:
transformed_eef_poses: transformed pose sequence (shape [T, 4, 4])
"""
return PoseUtils.pose_in_A_to_pose_in_B(
pose_in_A=src_eef_poses,
pose_A_in_B=delta_obj_pose[None],
)
def transform_source_data_segment_using_object_pose(
obj_pose: torch.Tensor,
src_eef_poses: torch.Tensor,
src_obj_pose: torch.Tensor,
) -> torch.Tensor:
"""
Transform a source data segment (object-centric subtask segment from source demonstration) such that
the relative poses between the target eef pose frame and the object frame are preserved. Recall that
each object-centric subtask segment corresponds to one object, and consists of a sequence of
target eef poses.
Args:
obj_pose: 4x4 object pose in current scene
src_eef_poses: pose sequence (shape [T, 4, 4]) for the sequence of end effector control poses
from the source demonstration
src_obj_pose: 4x4 object pose from the source demonstration
Returns:
transformed_eef_poses: transformed pose sequence (shape [T, 4, 4])
"""
# transform source end effector poses to be relative to source object frame
src_eef_poses_rel_obj = PoseUtils.pose_in_A_to_pose_in_B(
pose_in_A=src_eef_poses,
pose_A_in_B=PoseUtils.pose_inv(src_obj_pose[None]),
)
# apply relative poses to current object frame to obtain new target eef poses
transformed_eef_poses = PoseUtils.pose_in_A_to_pose_in_B(
pose_in_A=src_eef_poses_rel_obj,
pose_A_in_B=obj_pose[None],
)
return transformed_eef_poses
def get_delta_pose_with_scheme(
src_obj_pose: torch.Tensor,
cur_obj_pose: torch.Tensor,
task_constraint: dict,
) -> torch.Tensor:
"""
Get the delta pose with the given coordination scheme.
Args:
src_obj_pose: 4x4 object pose in source scene
cur_obj_pose: 4x4 object pose in current scene
task_constraint: task constraint dictionary
Returns:
delta_pose: 4x4 delta pose
"""
coord_transform_scheme = task_constraint["coordination_scheme"]
device = src_obj_pose.device
if coord_transform_scheme == SubTaskConstraintCoordinationScheme.TRANSFORM:
delta_pose = PoseUtils.get_delta_object_pose(cur_obj_pose, src_obj_pose)
# add noise to delta pose position
elif coord_transform_scheme == SubTaskConstraintCoordinationScheme.TRANSLATE:
delta_pose = torch.eye(4, device=device)
delta_pose[:3, 3] = cur_obj_pose[:3, 3] - src_obj_pose[:3, 3]
elif coord_transform_scheme == SubTaskConstraintCoordinationScheme.REPLAY:
delta_pose = torch.eye(4, device=device)
else:
raise ValueError(
f"coordination coord_transform_scheme {coord_transform_scheme} not supported, only"
f" {[e.value for e in SubTaskConstraintCoordinationScheme]} are supported"
)
pos_noise_scale = task_constraint["coordination_scheme_pos_noise_scale"]
rot_noise_scale = task_constraint["coordination_scheme_rot_noise_scale"]
if pos_noise_scale != 0.0 or rot_noise_scale != 0.0:
pos = delta_pose[:3, 3]
rot = delta_pose[:3, :3]
pos_new, rot_new = PoseUtils.add_uniform_noise_to_pose(pos, rot, pos_noise_scale, rot_noise_scale)
delta_pose = torch.eye(4, device=device)
delta_pose[:3, 3] = pos_new
delta_pose[:3, :3] = rot_new
return delta_pose
class DataGenerator: class DataGenerator:
""" """
The main data generator object that loads a source dataset, parses it, and The main data generator class that generates new trajectories from source datasets.
generates new trajectories.
The data generator, inspired by the MimicGen, enables the generation of new datasets based on a few human
collected source demonstrations.
The data generator works by parsing demonstrations into object-centric subtask segments, stored in DataGenInfoPool.
It then adapts these subtask segments to new scenes by transforming each segment according to the new scene’s context,
stitching them into a coherent trajectory for a robotic end-effector to execute.
""" """
def __init__( def __init__(
self, self,
env, env: ManagerBasedRLMimicEnv,
src_demo_datagen_info_pool=None, src_demo_datagen_info_pool: DataGenInfoPool | None = None,
dataset_path=None, dataset_path: str | None = None,
demo_keys=None, demo_keys: list[str] | None = None,
): ):
""" """
Args: Args:
env (Isaac Lab ManagerBasedEnv instance): environment to use for data generation env: environment to use for data generation
src_demo_datagen_info_pool (DataGenInfoPool): source demo datagen info pool src_demo_datagen_info_pool: source demo datagen info pool
dataset_path (str): path to hdf5 dataset to use for generation dataset_path: path to hdf5 dataset to use for generation
demo_keys (list of str): list of demonstration keys to use demo_keys: list of demonstration keys to use in file. If not provided, all demonstration keys
in file. If not provided, all demonstration keys will be will be used.
used.
""" """
self.env = env self.env = env
self.env_cfg = env.cfg self.env_cfg = env.cfg
assert isinstance(self.env_cfg, MimicEnvCfg) assert isinstance(self.env_cfg, MimicEnvCfg)
self.dataset_path = dataset_path self.dataset_path = dataset_path
if len(self.env_cfg.subtask_configs) != 1:
raise ValueError("Data generation currently supports only one end-effector.")
(self.eef_name,) = self.env_cfg.subtask_configs.keys()
(self.subtask_configs,) = self.env_cfg.subtask_configs.values()
# sanity check on task spec offset ranges - final subtask should not have any offset randomization # sanity check on task spec offset ranges - final subtask should not have any offset randomization
assert self.subtask_configs[-1].subtask_term_offset_range[0] == 0 for subtask_configs in self.env_cfg.subtask_configs.values():
assert self.subtask_configs[-1].subtask_term_offset_range[1] == 0 assert subtask_configs[-1].subtask_term_offset_range[0] == 0
assert subtask_configs[-1].subtask_term_offset_range[1] == 0
self.demo_keys = demo_keys self.demo_keys = demo_keys
...@@ -79,70 +187,83 @@ class DataGenerator: ...@@ -79,70 +187,83 @@ class DataGenerator:
) )
return msg return msg
def randomize_subtask_boundaries(self): def randomize_subtask_boundaries(self) -> dict[str, np.ndarray]:
""" """
Apply random offsets to sample subtask boundaries according to the task spec. Apply random offsets to sample subtask boundaries according to the task spec.
Recall that each demonstration is segmented into a set of subtask segments, and the Recall that each demonstration is segmented into a set of subtask segments, and the
end index of each subtask can have a random offset. end index of each subtask can have a random offset.
""" """
randomized_subtask_boundaries = {}
for eef_name, subtask_boundaries in self.src_demo_datagen_info_pool.subtask_boundaries.items():
# initial subtask start and end indices - shape (N, S, 2) # initial subtask start and end indices - shape (N, S, 2)
src_subtask_indices = np.array(self.src_demo_datagen_info_pool.subtask_indices) subtask_boundaries = np.array(subtask_boundaries)
# Randomize the start of the first subtask
first_subtask_start_offsets = np.random.randint(
low=self.env_cfg.subtask_configs[eef_name][0].first_subtask_start_offset_range[0],
high=self.env_cfg.subtask_configs[eef_name][0].first_subtask_start_offset_range[0] + 1,
size=subtask_boundaries.shape[0],
)
subtask_boundaries[:, 0, 0] += first_subtask_start_offsets
# for each subtask (except last one), sample all end offsets at once for each demonstration # for each subtask (except last one), sample all end offsets at once for each demonstration
# add them to subtask end indices, and then set them as the start indices of next subtask too # add them to subtask end indices, and then set them as the start indices of next subtask too
for i in range(src_subtask_indices.shape[1] - 1): for i in range(subtask_boundaries.shape[1] - 1):
end_offsets = np.random.randint( end_offsets = np.random.randint(
low=self.subtask_configs[i].subtask_term_offset_range[0], low=self.env_cfg.subtask_configs[eef_name][i].subtask_term_offset_range[0],
high=self.subtask_configs[i].subtask_term_offset_range[1] + 1, high=self.env_cfg.subtask_configs[eef_name][i].subtask_term_offset_range[1] + 1,
size=src_subtask_indices.shape[0], size=subtask_boundaries.shape[0],
) )
src_subtask_indices[:, i, 1] = src_subtask_indices[:, i, 1] + end_offsets subtask_boundaries[:, i, 1] = subtask_boundaries[:, i, 1] + end_offsets
# don't forget to set these as start indices for next subtask too # don't forget to set these as start indices for next subtask too
src_subtask_indices[:, i + 1, 0] = src_subtask_indices[:, i, 1] subtask_boundaries[:, i + 1, 0] = subtask_boundaries[:, i, 1]
# ensure non-empty subtasks # ensure non-empty subtasks
assert np.all((src_subtask_indices[:, :, 1] - src_subtask_indices[:, :, 0]) > 0), "got empty subtasks!" assert np.all((subtask_boundaries[:, :, 1] - subtask_boundaries[:, :, 0]) > 0), "got empty subtasks!"
# ensure subtask indices increase (both starts and ends) # ensure subtask indices increase (both starts and ends)
assert np.all( assert np.all(
(src_subtask_indices[:, 1:, :] - src_subtask_indices[:, :-1, :]) > 0 (subtask_boundaries[:, 1:, :] - subtask_boundaries[:, :-1, :]) > 0
), "subtask indices do not strictly increase" ), "subtask indices do not strictly increase"
# ensure subtasks are in order # ensure subtasks are in order
subtask_inds_flat = src_subtask_indices.reshape(src_subtask_indices.shape[0], -1) subtask_inds_flat = subtask_boundaries.reshape(subtask_boundaries.shape[0], -1)
assert np.all((subtask_inds_flat[:, 1:] - subtask_inds_flat[:, :-1]) >= 0), "subtask indices not in order" assert np.all((subtask_inds_flat[:, 1:] - subtask_inds_flat[:, :-1]) >= 0), "subtask indices not in order"
return src_subtask_indices randomized_subtask_boundaries[eef_name] = subtask_boundaries
return randomized_subtask_boundaries
def select_source_demo( def select_source_demo(
self, self,
eef_pose, eef_name: str,
object_pose, eef_pose: np.ndarray,
subtask_ind, object_pose: np.ndarray,
src_subtask_inds, src_demo_current_subtask_boundaries: np.ndarray,
subtask_object_name, subtask_object_name: str,
selection_strategy_name, selection_strategy_name: str,
selection_strategy_kwargs=None, selection_strategy_kwargs: dict | None = None,
): ) -> int:
""" """
Helper method to run source subtask segment selection. Helper method to run source subtask segment selection.
Args: Args:
eef_pose (np.array): current end effector pose eef_name: name of end effector
object_pose (np.array): current object pose for this subtask eef_pose: current end effector pose
subtask_ind (int): index of subtask object_pose: current object pose for this subtask
src_subtask_inds (np.array): start and end indices for subtask segment in source demonstrations of shape (N, 2) src_demo_current_subtask_boundaries: start and end indices for subtask segment in source demonstrations of shape (N, 2)
subtask_object_name (str): name of reference object for this subtask subtask_object_name: name of reference object for this subtask
selection_strategy_name (str): name of selection strategy selection_strategy_name: name of selection strategy
selection_strategy_kwargs (dict): extra kwargs for running selection strategy selection_strategy_kwargs: extra kwargs for running selection strategy
Returns: Returns:
selected_src_demo_ind (int): selected source demo index selected_src_demo_ind: selected source demo index
""" """
if subtask_object_name is None: if subtask_object_name is None:
# no reference object - only random selection is supported # no reference object - only random selection is supported
assert selection_strategy_name == "random" assert selection_strategy_name == "random", selection_strategy_name
# We need to collect the datagen info objects over the timesteps for the subtask segment in each source # We need to collect the datagen info objects over the timesteps for the subtask segment in each source
# demo, so that it can be used by the selection strategy. # demo, so that it can be used by the selection strategy.
...@@ -152,13 +273,13 @@ class DataGenerator: ...@@ -152,13 +273,13 @@ class DataGenerator:
src_ep_datagen_info = self.src_demo_datagen_info_pool.datagen_infos[i] src_ep_datagen_info = self.src_demo_datagen_info_pool.datagen_infos[i]
# time indices for subtask # time indices for subtask
subtask_start_ind = src_subtask_inds[i][0] subtask_start_ind = src_demo_current_subtask_boundaries[i][0]
subtask_end_ind = src_subtask_inds[i][1] subtask_end_ind = src_demo_current_subtask_boundaries[i][1]
# get subtask segment using indices # get subtask segment using indices
src_subtask_datagen_infos.append( src_subtask_datagen_infos.append(
DatagenInfo( DatagenInfo(
eef_pose=src_ep_datagen_info.eef_pose[subtask_start_ind:subtask_end_ind], eef_pose=src_ep_datagen_info.eef_pose[eef_name][subtask_start_ind:subtask_end_ind],
# only include object pose for relevant object in subtask # only include object pose for relevant object in subtask
object_poses=( object_poses=(
{ {
...@@ -171,8 +292,8 @@ class DataGenerator: ...@@ -171,8 +292,8 @@ class DataGenerator:
), ),
# subtask termination signal is unused # subtask termination signal is unused
subtask_term_signals=None, subtask_term_signals=None,
target_eef_pose=src_ep_datagen_info.target_eef_pose[subtask_start_ind:subtask_end_ind], target_eef_pose=src_ep_datagen_info.target_eef_pose[eef_name][subtask_start_ind:subtask_end_ind],
gripper_action=src_ep_datagen_info.gripper_action[subtask_start_ind:subtask_end_ind], gripper_action=src_ep_datagen_info.gripper_action[eef_name][subtask_start_ind:subtask_end_ind],
) )
) )
...@@ -191,163 +312,193 @@ class DataGenerator: ...@@ -191,163 +312,193 @@ class DataGenerator:
return selected_src_demo_ind return selected_src_demo_ind
async def generate( def generate_trajectory(
self, self,
env_id, env_id: int,
success_term, eef_name: str,
env_action_queue: asyncio.Queue | None = None, subtask_ind: int,
select_src_per_subtask=False, all_randomized_subtask_boundaries: dict[str, np.ndarray],
transform_first_robot_pose=False, runtime_subtask_constraints_dict: dict[tuple[str, int], dict],
interpolate_from_last_target_pose=True, selected_src_demo_inds: dict[str, int | None],
pause_subtask=False, prev_executed_traj: dict[str, list[Waypoint] | None],
export_demo=True, ) -> list[Waypoint]:
):
""" """
Attempt to generate a new demonstration. Generate a trajectory for the given subtask.
Args: Args:
env_id (int): environment ID env_id: environment index
eef_name: name of end effector
success_term (TerminationTermCfg): success function to check if the task is successful subtask_ind: index of subtask
all_randomized_subtask_boundaries: randomized subtask boundaries
env_action_queue (asyncio.Queue): queue to store actions for each environment runtime_subtask_constraints_dict: runtime subtask constraints
selected_src_demo_inds: dictionary of selected source demo indices per eef, updated in place
select_src_per_subtask (bool): if True, select a different source demonstration for each subtask prev_executed_traj: dictionary of previously executed eef trajectories
during data generation, else keep the same one for the entire episode
transform_first_robot_pose (bool): if True, each subtask segment will consist of the first
robot pose and the target poses instead of just the target poses. Can sometimes help
improve data generation quality as the interpolation segment will interpolate to where
the robot started in the source segment instead of the first target pose. Note that the
first subtask segment of each episode will always include the first robot pose, regardless
of this argument.
interpolate_from_last_target_pose (bool): if True, each interpolation segment will start from
the last target pose in the previous subtask segment, instead of the current robot pose. Can
sometimes improve data generation quality.
pause_subtask (bool): if True, pause after every subtask during generation, for
debugging.
Returns: Returns:
results (dict): dictionary with the following items: trajectory: generated trajectory
initial_state (dict): initial simulator state for the executed trajectory
states (list): simulator state at each timestep
observations (list): observation dictionary at each timestep
datagen_infos (list): datagen_info at each timestep
actions (np.array): action executed at each timestep
success (bool): whether the trajectory successfully solved the task or not
src_demo_inds (list): list of selected source demonstration indices for each subtask
src_demo_labels (np.array): same as @src_demo_inds, but repeated to have a label for each timestep of the trajectory
""" """
eef_names = list(self.env_cfg.subtask_configs.keys()) subtask_configs = self.env_cfg.subtask_configs[eef_name]
eef_name = eef_names[0]
# reset the env to create a new task demo instance
env_id_tensor = torch.tensor([env_id], dtype=torch.int64, device=self.env.device)
self.env.recorder_manager.reset(env_ids=env_id_tensor)
self.env.reset(env_ids=env_id_tensor)
new_initial_state = self.env.scene.get_state(is_relative=True)
# some state variables used during generation
selected_src_demo_ind = None
prev_executed_traj = None
# save generated data in these variables
generated_states = []
generated_obs = []
generated_actions = []
generated_success = False
generated_src_demo_inds = [] # store selected src demo ind for each subtask in each trajectory
generated_src_demo_labels = (
[]
) # like @generated_src_demo_inds, but padded to align with size of @generated_actions
prev_src_demo_datagen_info_pool_size = 0
for subtask_ind in range(len(self.subtask_configs)):
# some things only happen on first subtask
is_first_subtask = subtask_ind == 0
# name of object for this subtask # name of object for this subtask
subtask_object_name = self.subtask_configs[subtask_ind].object_ref subtask_object_name = self.env_cfg.subtask_configs[eef_name][subtask_ind].object_ref
subtask_object_pose = (
# corresponding current object pose
cur_object_pose = (
self.env.get_object_poses(env_ids=[env_id])[subtask_object_name][0] self.env.get_object_poses(env_ids=[env_id])[subtask_object_name][0]
if (subtask_object_name is not None) if (subtask_object_name is not None)
else None else None
) )
async with self.src_demo_datagen_info_pool.asyncio_lock: is_first_subtask = subtask_ind == 0
if len(self.src_demo_datagen_info_pool.datagen_infos) > prev_src_demo_datagen_info_pool_size:
# src_demo_datagen_info_pool at this point may be updated with new demos, need_source_demo_selection = is_first_subtask or self.env_cfg.datagen_config.generation_select_src_per_subtask
# so we need to updaet subtask boundaries again
all_subtask_inds = (
self.randomize_subtask_boundaries()
) # shape [N, S, 2], last dim is start and end action lengths
prev_src_demo_datagen_info_pool_size = len(self.src_demo_datagen_info_pool.datagen_infos)
# We need source demonstration selection for the first subtask (always), and possibly for if not self.env_cfg.datagen_config.generation_select_src_per_arm:
# other subtasks if @select_src_per_subtask is set. need_source_demo_selection = need_source_demo_selection and selected_src_demo_inds[eef_name] is None
need_source_demo_selection = is_first_subtask or select_src_per_subtask
use_delta_transform = None
coord_transform_scheme = None
if (eef_name, subtask_ind) in runtime_subtask_constraints_dict:
if runtime_subtask_constraints_dict[(eef_name, subtask_ind)]["type"] == SubTaskConstraintType.COORDINATION:
# avoid selecting source demo if it has already been selected by the concurrent task
concurrent_task_spec_key = runtime_subtask_constraints_dict[(eef_name, subtask_ind)][
"concurrent_task_spec_key"
]
concurrent_subtask_ind = runtime_subtask_constraints_dict[(eef_name, subtask_ind)][
"concurrent_subtask_ind"
]
concurrent_selected_src_ind = runtime_subtask_constraints_dict[
(concurrent_task_spec_key, concurrent_subtask_ind)
]["selected_src_demo_ind"]
if concurrent_selected_src_ind is not None:
# the concurrent task has started, so we should use the same source demo
selected_src_demo_inds[eef_name] = concurrent_selected_src_ind
need_source_demo_selection = False
use_delta_transform = runtime_subtask_constraints_dict[
(concurrent_task_spec_key, concurrent_subtask_ind)
]["transform"]
else:
assert (
"transform" not in runtime_subtask_constraints_dict[(eef_name, subtask_ind)]
), "transform should not be set for concurrent task"
# need to transform demo according to scheme
coord_transform_scheme = runtime_subtask_constraints_dict[(eef_name, subtask_ind)][
"coordination_scheme"
]
if coord_transform_scheme != SubTaskConstraintCoordinationScheme.REPLAY:
assert (
subtask_object_name is not None
), f"object reference should not be None for {coord_transform_scheme} coordination scheme"
# Run source demo selection or use selected demo from previous iteration
if need_source_demo_selection: if need_source_demo_selection:
selected_src_demo_ind = self.select_source_demo( selected_src_demo_inds[eef_name] = self.select_source_demo(
eef_pose=self.env.get_robot_eef_pose(eef_name, env_ids=[env_id])[0], eef_name=eef_name,
object_pose=cur_object_pose, eef_pose=self.env.get_robot_eef_pose(env_ids=[env_id], eef_name=eef_name)[0],
subtask_ind=subtask_ind, object_pose=subtask_object_pose,
src_subtask_inds=all_subtask_inds[:, subtask_ind], src_demo_current_subtask_boundaries=all_randomized_subtask_boundaries[eef_name][:, subtask_ind],
subtask_object_name=subtask_object_name, subtask_object_name=subtask_object_name,
selection_strategy_name=self.subtask_configs[subtask_ind].selection_strategy, selection_strategy_name=self.env_cfg.subtask_configs[eef_name][subtask_ind].selection_strategy,
selection_strategy_kwargs=self.subtask_configs[subtask_ind].selection_strategy_kwargs, selection_strategy_kwargs=self.env_cfg.subtask_configs[eef_name][subtask_ind].selection_strategy_kwargs,
) )
assert selected_src_demo_ind is not None
assert selected_src_demo_inds[eef_name] is not None
selected_src_demo_ind = selected_src_demo_inds[eef_name]
if not self.env_cfg.datagen_config.generation_select_src_per_arm and need_source_demo_selection:
for itrated_eef_name in self.env_cfg.subtask_configs.keys():
selected_src_demo_inds[itrated_eef_name] = selected_src_demo_ind
# selected subtask segment time indices # selected subtask segment time indices
selected_src_subtask_inds = all_subtask_inds[selected_src_demo_ind, subtask_ind] selected_src_subtask_boundary = all_randomized_subtask_boundaries[eef_name][selected_src_demo_ind, subtask_ind]
if (eef_name, subtask_ind) in runtime_subtask_constraints_dict:
if runtime_subtask_constraints_dict[(eef_name, subtask_ind)]["type"] == SubTaskConstraintType.COORDINATION:
# store selected source demo ind for concurrent task
runtime_subtask_constraints_dict[(eef_name, subtask_ind)][
"selected_src_demo_ind"
] = selected_src_demo_ind
concurrent_task_spec_key = runtime_subtask_constraints_dict[(eef_name, subtask_ind)][
"concurrent_task_spec_key"
]
concurrent_subtask_ind = runtime_subtask_constraints_dict[(eef_name, subtask_ind)][
"concurrent_subtask_ind"
]
concurrent_src_subtask_inds = all_randomized_subtask_boundaries[concurrent_task_spec_key][
selected_src_demo_ind, concurrent_subtask_ind
]
subtask_len = selected_src_subtask_boundary[1] - selected_src_subtask_boundary[0]
concurrent_subtask_len = concurrent_src_subtask_inds[1] - concurrent_src_subtask_inds[0]
runtime_subtask_constraints_dict[(eef_name, subtask_ind)]["synchronous_steps"] = min(
subtask_len, concurrent_subtask_len
)
# TODO allow for different anchor selection strategies for each subtask
# get subtask segment, consisting of the sequence of robot eef poses, target poses, gripper actions # get subtask segment, consisting of the sequence of robot eef poses, target poses, gripper actions
src_ep_datagen_info = self.src_demo_datagen_info_pool.datagen_infos[selected_src_demo_ind] src_ep_datagen_info = self.src_demo_datagen_info_pool.datagen_infos[selected_src_demo_ind]
src_subtask_eef_poses = src_ep_datagen_info.eef_pose[eef_name][
src_subtask_eef_poses = src_ep_datagen_info.eef_pose[ selected_src_subtask_boundary[0] : selected_src_subtask_boundary[1]
selected_src_subtask_inds[0] : selected_src_subtask_inds[1]
] ]
src_subtask_target_poses = src_ep_datagen_info.target_eef_pose[ src_subtask_target_poses = src_ep_datagen_info.target_eef_pose[eef_name][
selected_src_subtask_inds[0] : selected_src_subtask_inds[1] selected_src_subtask_boundary[0] : selected_src_subtask_boundary[1]
] ]
src_subtask_gripper_actions = src_ep_datagen_info.gripper_action[ src_subtask_gripper_actions = src_ep_datagen_info.gripper_action[eef_name][
selected_src_subtask_inds[0] : selected_src_subtask_inds[1] selected_src_subtask_boundary[0] : selected_src_subtask_boundary[1]
] ]
# get reference object pose from source demo # get reference object pose from source demo
src_subtask_object_pose = ( src_subtask_object_pose = (
src_ep_datagen_info.object_poses[subtask_object_name][selected_src_subtask_inds[0]] src_ep_datagen_info.object_poses[subtask_object_name][selected_src_subtask_boundary[0]]
if (subtask_object_name is not None) if (subtask_object_name is not None)
else None else None
) )
if is_first_subtask or transform_first_robot_pose: if is_first_subtask or self.env_cfg.datagen_config.generation_transform_first_robot_pose:
# Source segment consists of first robot eef pose and the target poses. # Source segment consists of first robot eef pose and the target poses. This ensures that
# we will interpolate to the first robot eef pose in this source segment, instead of the
# first robot target pose.
src_eef_poses = torch.cat([src_subtask_eef_poses[0:1], src_subtask_target_poses], dim=0) src_eef_poses = torch.cat([src_subtask_eef_poses[0:1], src_subtask_target_poses], dim=0)
else:
# Source segment consists of just the target poses.
src_eef_poses = src_subtask_target_poses.clone()
# account for extra timestep added to @src_eef_poses # account for extra timestep added to @src_eef_poses
src_subtask_gripper_actions = torch.cat( src_subtask_gripper_actions = torch.cat(
[src_subtask_gripper_actions[0:1], src_subtask_gripper_actions], dim=0 [src_subtask_gripper_actions[0:1], src_subtask_gripper_actions], dim=0
) )
else:
# Source segment consists of just the target poses.
src_eef_poses = src_subtask_target_poses.clone()
src_subtask_gripper_actions = src_subtask_gripper_actions.clone()
# Transform source demonstration segment using relevant object pose. # Transform source demonstration segment using relevant object pose.
if use_delta_transform is not None:
# use delta transform from concurrent task
transformed_eef_poses = transform_source_data_segment_using_delta_object_pose(
src_eef_poses, use_delta_transform
)
# TODO: Uncomment below to support case of temporal concurrent but NOT does not require coordination. Need to decide if this case is necessary
# if subtask_object_name is not None:
# transformed_eef_poses = PoseUtils.transform_source_data_segment_using_object_pose(
# cur_object_poses[task_spec_idx],
# src_eef_poses,
# src_subtask_object_pose,
# )
else:
if coord_transform_scheme is not None:
delta_obj_pose = get_delta_pose_with_scheme(
src_subtask_object_pose,
subtask_object_pose,
runtime_subtask_constraints_dict[(eef_name, subtask_ind)],
)
transformed_eef_poses = transform_source_data_segment_using_delta_object_pose(
src_eef_poses, delta_obj_pose
)
runtime_subtask_constraints_dict[(eef_name, subtask_ind)]["transform"] = delta_obj_pose
else:
if subtask_object_name is not None: if subtask_object_name is not None:
transformed_eef_poses = PoseUtils.transform_poses_from_frame_A_to_frame_B( transformed_eef_poses = transform_source_data_segment_using_object_pose(
src_poses=src_eef_poses, subtask_object_pose,
frame_A=cur_object_pose, src_eef_poses,
frame_B=src_subtask_object_pose, src_subtask_object_pose,
) )
else: else:
print(f"skipping transformation for {subtask_object_name}")
# skip transformation if no reference object is provided # skip transformation if no reference object is provided
transformed_eef_poses = src_eef_poses transformed_eef_poses = src_eef_poses
...@@ -355,27 +506,25 @@ class DataGenerator: ...@@ -355,27 +506,25 @@ class DataGenerator:
# that will be executed and then execute it. # that will be executed and then execute it.
traj_to_execute = WaypointTrajectory() traj_to_execute = WaypointTrajectory()
if interpolate_from_last_target_pose and (not is_first_subtask): if self.env_cfg.datagen_config.generation_interpolate_from_last_target_pose and (not is_first_subtask):
# Interpolation segment will start from last target pose (which may not have been achieved). # Interpolation segment will start from last target pose (which may not have been achieved).
assert prev_executed_traj is not None assert prev_executed_traj[eef_name] is not None
last_waypoint = prev_executed_traj.last_waypoint last_waypoint = prev_executed_traj[eef_name][-1]
init_sequence = WaypointSequence(sequence=[last_waypoint]) init_sequence = WaypointSequence(sequence=[last_waypoint])
else: else:
# Interpolation segment will start from current robot eef pose. # Interpolation segment will start from current robot eef pose.
init_sequence = WaypointSequence.from_poses( init_sequence = WaypointSequence.from_poses(
eef_names=eef_names, poses=self.env.get_robot_eef_pose(env_ids=[env_id], eef_name=eef_name)[0].unsqueeze(0),
poses=self.env.get_robot_eef_pose(eef_name, env_ids=[env_id])[0][None], gripper_actions=src_subtask_gripper_actions[0].unsqueeze(0),
gripper_actions=src_subtask_gripper_actions[0:1], action_noise=subtask_configs[subtask_ind].action_noise,
action_noise=self.subtask_configs[subtask_ind].action_noise,
) )
traj_to_execute.add_waypoint_sequence(init_sequence) traj_to_execute.add_waypoint_sequence(init_sequence)
# Construct trajectory for the transformed segment. # Construct trajectory for the transformed segment.
transformed_seq = WaypointSequence.from_poses( transformed_seq = WaypointSequence.from_poses(
eef_names=eef_names,
poses=transformed_eef_poses, poses=transformed_eef_poses,
gripper_actions=src_subtask_gripper_actions, gripper_actions=src_subtask_gripper_actions,
action_noise=self.subtask_configs[subtask_ind].action_noise, action_noise=subtask_configs[subtask_ind].action_noise,
) )
transformed_traj = WaypointTrajectory() transformed_traj = WaypointTrajectory()
transformed_traj.add_waypoint_sequence(transformed_seq) transformed_traj.add_waypoint_sequence(transformed_seq)
...@@ -384,12 +533,11 @@ class DataGenerator: ...@@ -384,12 +533,11 @@ class DataGenerator:
# Interpolation will happen from the initial pose (@init_sequence) to the first element of @transformed_seq. # Interpolation will happen from the initial pose (@init_sequence) to the first element of @transformed_seq.
traj_to_execute.merge( traj_to_execute.merge(
transformed_traj, transformed_traj,
eef_names=eef_names, num_steps_interp=self.env_cfg.subtask_configs[eef_name][subtask_ind].num_interpolation_steps,
num_steps_interp=self.subtask_configs[subtask_ind].num_interpolation_steps, num_steps_fixed=self.env_cfg.subtask_configs[eef_name][subtask_ind].num_fixed_steps,
num_steps_fixed=self.subtask_configs[subtask_ind].num_fixed_steps,
action_noise=( action_noise=(
float(self.subtask_configs[subtask_ind].apply_noise_during_interpolation) float(self.env_cfg.subtask_configs[eef_name][subtask_ind].apply_noise_during_interpolation)
* self.subtask_configs[subtask_ind].action_noise * self.env_cfg.subtask_configs[eef_name][subtask_ind].action_noise
), ),
) )
...@@ -398,35 +546,225 @@ class DataGenerator: ...@@ -398,35 +546,225 @@ class DataGenerator:
# the rest of the trajectory (interpolation segment and transformed subtask segment). # the rest of the trajectory (interpolation segment and transformed subtask segment).
traj_to_execute.pop_first() traj_to_execute.pop_first()
# Execute the trajectory and collect data. # Return the generated trajectory
exec_results = await traj_to_execute.execute( return traj_to_execute.get_full_sequence().sequence
env=self.env, env_id=env_id, env_action_queue=env_action_queue, success_term=success_term
async def generate(
self,
env_id: int,
success_term: TerminationTermCfg,
env_reset_queue: asyncio.Queue | None = None,
env_action_queue: asyncio.Queue | None = None,
pause_subtask: bool = False,
export_demo: bool = True,
) -> dict:
"""
Attempt to generate a new demonstration.
Args:
env_id: environment index
success_term: success function to check if the task is successful
env_reset_queue: queue to store environment IDs for reset
env_action_queue: queue to store actions for each environment
pause_subtask: if True, pause after every subtask during generation, for debugging
export_demo: if True, export the generated demonstration
Returns:
results: dictionary with the following items:
initial_state (dict): initial simulator state for the executed trajectory
states (list): simulator state at each timestep
observations (list): observation dictionary at each timestep
datagen_infos (list): datagen_info at each timestep
actions (np.array): action executed at each timestep
success (bool): whether the trajectory successfully solved the task or not
src_demo_inds (list): list of selected source demonstration indices for each subtask
src_demo_labels (np.array): same as @src_demo_inds, but repeated to have a label for each timestep of the trajectory
"""
# reset the env to create a new task demo instance
env_id_tensor = torch.tensor([env_id], dtype=torch.int64, device=self.env.device)
self.env.recorder_manager.reset(env_ids=env_id_tensor)
await env_reset_queue.put(env_id)
await env_reset_queue.join()
new_initial_state = self.env.scene.get_state(is_relative=True)
# create runtime subtask constraint rules from subtask constraint configs
runtime_subtask_constraints_dict = {}
for subtask_constraint in self.env_cfg.task_constraint_configs:
runtime_subtask_constraints_dict.update(subtask_constraint.generate_runtime_subtask_constraints())
# save generated data in these variables
generated_states = []
generated_obs = []
generated_actions = []
generated_success = False
# some eef-specific state variables used during generation
current_eef_selected_src_demo_indices = {}
current_eef_subtask_trajectories = {}
current_eef_subtask_indices = {}
current_eef_subtask_step_indices = {}
eef_subtasks_done = {}
for eef_name in self.env_cfg.subtask_configs.keys():
current_eef_selected_src_demo_indices[eef_name] = None
# prev_eef_executed_traj[eef_name] = None # type of list of Waypoint
current_eef_subtask_trajectories[eef_name] = None # type of list of Waypoint
current_eef_subtask_indices[eef_name] = 0
current_eef_subtask_step_indices[eef_name] = None
eef_subtasks_done[eef_name] = False
prev_src_demo_datagen_info_pool_size = 0
# While loop that runs per time step
while True:
async with self.src_demo_datagen_info_pool.asyncio_lock:
if len(self.src_demo_datagen_info_pool.datagen_infos) > prev_src_demo_datagen_info_pool_size:
# src_demo_datagen_info_pool at this point may be updated with new demos,
# so we need to updaet subtask boundaries again
randomized_subtask_boundaries = (
self.randomize_subtask_boundaries()
) # shape [N, S, 2], last dim is start and end action lengths
prev_src_demo_datagen_info_pool_size = len(self.src_demo_datagen_info_pool.datagen_infos)
# generate trajectory for a subtask for the eef that is currently at the beginning of a subtask
for eef_name, eef_subtask_step_index in current_eef_subtask_step_indices.items():
if eef_subtask_step_index is None:
# current_eef_selected_src_demo_indices will be updated in generate_trajectory
subtask_trajectory = self.generate_trajectory(
env_id,
eef_name,
current_eef_subtask_indices[eef_name],
randomized_subtask_boundaries,
runtime_subtask_constraints_dict,
current_eef_selected_src_demo_indices,
current_eef_subtask_trajectories,
)
current_eef_subtask_trajectories[eef_name] = subtask_trajectory
current_eef_subtask_step_indices[eef_name] = 0
# current_eef_selected_src_demo_indices[eef_name] = selected_src_demo_inds
# two_arm_trajectories[task_spec_idx] = subtask_trajectory
# prev_executed_traj[task_spec_idx] = subtask_trajectory
# determine the next waypoint for each eef based on the current subtask constraints
eef_waypoint_dict = {}
for eef_name in sorted(self.env_cfg.subtask_configs.keys()):
# handle constraints
step_ind = current_eef_subtask_step_indices[eef_name]
subtask_ind = current_eef_subtask_indices[eef_name]
if (eef_name, subtask_ind) in runtime_subtask_constraints_dict:
task_constraint = runtime_subtask_constraints_dict[(eef_name, subtask_ind)]
if task_constraint["type"] == SubTaskConstraintType._SEQUENTIAL_LATTER:
min_time_diff = task_constraint["min_time_diff"]
if not task_constraint["fulfilled"]:
if (
min_time_diff == -1
or step_ind >= len(current_eef_subtask_trajectories[eef_name]) - min_time_diff
):
if step_ind > 0:
# wait at the same step
step_ind -= 1
current_eef_subtask_step_indices[eef_name] = step_ind
elif task_constraint["type"] == SubTaskConstraintType.COORDINATION:
synchronous_steps = task_constraint["synchronous_steps"]
concurrent_task_spec_key = task_constraint["concurrent_task_spec_key"]
concurrent_subtask_ind = task_constraint["concurrent_subtask_ind"]
concurrent_task_fulfilled = runtime_subtask_constraints_dict[
(concurrent_task_spec_key, concurrent_subtask_ind)
]["fulfilled"]
if (
task_constraint["coordination_synchronize_start"]
and current_eef_subtask_indices[concurrent_task_spec_key] < concurrent_subtask_ind
):
# the concurrent eef is not yet at the concurrent subtask, so wait at the first action
# this also makes sure that the concurrent task starts at the same time as this task
step_ind = 0
current_eef_subtask_step_indices[eef_name] = 0
else:
if (
not concurrent_task_fulfilled
and step_ind >= len(current_eef_subtask_trajectories[eef_name]) - synchronous_steps
):
# trigger concurrent task
runtime_subtask_constraints_dict[(concurrent_task_spec_key, concurrent_subtask_ind)][
"fulfilled"
] = True
if not task_constraint["fulfilled"]:
if step_ind >= len(current_eef_subtask_trajectories[eef_name]) - synchronous_steps:
if step_ind > 0:
step_ind -= 1
current_eef_subtask_step_indices[eef_name] = step_ind # wait here
waypoint = current_eef_subtask_trajectories[eef_name][step_ind]
eef_waypoint_dict[eef_name] = waypoint
multi_waypoint = MultiWaypoint(eef_waypoint_dict)
# execute the next waypoints for all eefs
exec_results = await multi_waypoint.execute(
env=self.env,
success_term=success_term,
env_id=env_id,
env_action_queue=env_action_queue,
) )
# check that trajectory is non-empty # update execution state buffers
if len(exec_results["states"]) > 0: if len(exec_results["states"]) > 0:
generated_states += exec_results["states"] generated_states.extend(exec_results["states"])
generated_obs += exec_results["observations"] generated_obs.extend(exec_results["observations"])
generated_actions.append(exec_results["actions"]) generated_actions.extend(exec_results["actions"])
generated_success = generated_success or exec_results["success"] generated_success = generated_success or exec_results["success"]
generated_src_demo_inds.append(selected_src_demo_ind)
generated_src_demo_labels.append(
selected_src_demo_ind
* torch.ones(
(exec_results["actions"].shape[0], 1), dtype=torch.int, device=exec_results["actions"].device
)
)
# remember last trajectory for eef_name in self.env_cfg.subtask_configs.keys():
prev_executed_traj = traj_to_execute current_eef_subtask_step_indices[eef_name] += 1
subtask_ind = current_eef_subtask_indices[eef_name]
if current_eef_subtask_step_indices[eef_name] == len(
current_eef_subtask_trajectories[eef_name]
): # subtask done
if (eef_name, subtask_ind) in runtime_subtask_constraints_dict:
task_constraint = runtime_subtask_constraints_dict[(eef_name, subtask_ind)]
if task_constraint["type"] == SubTaskConstraintType._SEQUENTIAL_FORMER:
constrained_task_spec_key = task_constraint["constrained_task_spec_key"]
constrained_subtask_ind = task_constraint["constrained_subtask_ind"]
runtime_subtask_constraints_dict[(constrained_task_spec_key, constrained_subtask_ind)][
"fulfilled"
] = True
elif task_constraint["type"] == SubTaskConstraintType.COORDINATION:
concurrent_task_spec_key = task_constraint["concurrent_task_spec_key"]
concurrent_subtask_ind = task_constraint["concurrent_subtask_ind"]
# concurrent_task_spec_idx = task_spec_keys.index(concurrent_task_spec_key)
task_constraint["finished"] = True
# check if concurrent task has been finished
assert (
runtime_subtask_constraints_dict[(concurrent_task_spec_key, concurrent_subtask_ind)][
"finished"
]
or current_eef_subtask_step_indices[concurrent_task_spec_key]
>= len(current_eef_subtask_trajectories[concurrent_task_spec_key]) - 1
)
if pause_subtask: if pause_subtask:
input(f"Pausing after subtask {subtask_ind} execution. Press any key to continue...") input(
f"Pausing after subtask {current_eef_subtask_indices[eef_name]} of {eef_name} execution."
" Press any key to continue..."
)
# This is a check to see if this arm has completed all the subtasks
if current_eef_subtask_indices[eef_name] == len(self.env_cfg.subtask_configs[eef_name]) - 1:
eef_subtasks_done[eef_name] = True
# If all subtasks done for this arm, repeat last waypoint to make sure this arm does not move
current_eef_subtask_trajectories[eef_name].append(
current_eef_subtask_trajectories[eef_name][-1]
)
else:
current_eef_subtask_step_indices[eef_name] = None
current_eef_subtask_indices[eef_name] += 1
# Check if all eef_subtasks_done values are True
if all(eef_subtasks_done.values()):
break
# merge numpy arrays # merge numpy arrays
if len(generated_actions) > 0: if len(generated_actions) > 0:
generated_actions = torch.cat(generated_actions, dim=0) generated_actions = torch.cat(generated_actions, dim=0)
generated_src_demo_labels = torch.cat(generated_src_demo_labels, dim=0)
# set success to the recorded episode data and export to file # set success to the recorded episode data and export to file
self.env.recorder_manager.set_success_to_episodes( self.env.recorder_manager.set_success_to_episodes(
...@@ -441,7 +779,5 @@ class DataGenerator: ...@@ -441,7 +779,5 @@ class DataGenerator:
observations=generated_obs, observations=generated_obs,
actions=generated_actions, actions=generated_actions,
success=generated_success, success=generated_success,
src_demo_inds=generated_src_demo_inds,
src_demo_labels=generated_src_demo_labels,
) )
return results return results
...@@ -6,7 +6,6 @@ ...@@ -6,7 +6,6 @@
""" """
Defines structure of information that is needed from an environment for data generation. Defines structure of information that is needed from an environment for data generation.
""" """
import torch
from copy import deepcopy from copy import deepcopy
...@@ -46,40 +45,6 @@ class DatagenInfo: ...@@ -46,40 +45,6 @@ class DatagenInfo:
gripper_action (torch.Tensor or None): gripper actions of shape [..., D] where D gripper_action (torch.Tensor or None): gripper actions of shape [..., D] where D
is the dimension of the gripper actuation action for the robot arm is the dimension of the gripper actuation action for the robot arm
""" """
# Type checks using assert
if eef_pose is not None:
assert isinstance(
eef_pose, torch.Tensor
), f"Expected 'eef_pose' to be of type torch.Tensor, but got {type(eef_pose)}"
if object_poses is not None:
assert isinstance(
object_poses, dict
), f"Expected 'object_poses' to be a dictionary, but got {type(object_poses)}"
for k, v in object_poses.items():
assert isinstance(
v, torch.Tensor
), f"Expected 'object_poses[{k}]' to be of type torch.Tensor, but got {type(v)}"
if subtask_term_signals is not None:
assert isinstance(
subtask_term_signals, dict
), f"Expected 'subtask_term_signals' to be a dictionary, but got {type(subtask_term_signals)}"
for k, v in subtask_term_signals.items():
assert isinstance(
v, (torch.Tensor, int, float)
), f"Expected 'subtask_term_signals[{k}]' to be of type torch.Tensor, int, or float, but got {type(v)}"
if target_eef_pose is not None:
assert isinstance(
target_eef_pose, torch.Tensor
), f"Expected 'target_eef_pose' to be of type torch.Tensor, but got {type(target_eef_pose)}"
if gripper_action is not None:
assert isinstance(
gripper_action, torch.Tensor
), f"Expected 'gripper_action' to be of type torch.Tensor, but got {type(gripper_action)}"
self.eef_pose = None self.eef_pose = None
if eef_pose is not None: if eef_pose is not None:
self.eef_pose = eef_pose self.eef_pose = eef_pose
......
...@@ -5,7 +5,6 @@ ...@@ -5,7 +5,6 @@
import asyncio import asyncio
import isaaclab.utils.math as PoseUtils
from isaaclab.utils.datasets import EpisodeData, HDF5DatasetFileHandler from isaaclab.utils.datasets import EpisodeData, HDF5DatasetFileHandler
from isaaclab_mimic.datagen.datagen_info import DatagenInfo from isaaclab_mimic.datagen.datagen_info import DatagenInfo
...@@ -28,7 +27,9 @@ class DataGenInfoPool: ...@@ -28,7 +27,9 @@ class DataGenInfoPool:
asyncio_lock (asyncio.Lock or None): asyncio lock to use for thread safety asyncio_lock (asyncio.Lock or None): asyncio lock to use for thread safety
""" """
self._datagen_infos = [] self._datagen_infos = []
self._subtask_indices = []
# Start and end step indices of each subtask in each episode for each eef
self._subtask_boundaries: dict[str, list[list[tuple[int, int]]]] = {}
self.env = env self.env = env
self.env_cfg = env_cfg self.env_cfg = env_cfg
...@@ -36,13 +37,15 @@ class DataGenInfoPool: ...@@ -36,13 +37,15 @@ class DataGenInfoPool:
self._asyncio_lock = asyncio_lock self._asyncio_lock = asyncio_lock
if len(env_cfg.subtask_configs) != 1: # Subtask termination infos for the given environment
raise ValueError("Data generation currently supports only one end-effector.") self.subtask_term_signal_names: dict[str, list[str]] = {}
self.subtask_term_offset_ranges: dict[str, list[tuple[int, int]]] = {}
(subtask_configs,) = env_cfg.subtask_configs.values() for eef_name, eef_subtask_configs in env_cfg.subtask_configs.items():
self.subtask_term_signals = [subtask_config.subtask_term_signal for subtask_config in subtask_configs] self.subtask_term_signal_names[eef_name] = [
self.subtask_term_offset_ranges = [ subtask_config.subtask_term_signal for subtask_config in eef_subtask_configs
subtask_config.subtask_term_offset_range for subtask_config in subtask_configs ]
self.subtask_term_offset_ranges[eef_name] = [
subtask_config.subtask_term_offset_range for subtask_config in eef_subtask_configs
] ]
@property @property
...@@ -51,9 +54,9 @@ class DataGenInfoPool: ...@@ -51,9 +54,9 @@ class DataGenInfoPool:
return self._datagen_infos return self._datagen_infos
@property @property
def subtask_indices(self): def subtask_boundaries(self) -> dict[str, list[list[tuple[int, int]]]]:
"""Returns the subtask indices.""" """Returns the subtask boundaries."""
return self._subtask_indices return self._subtask_boundaries
@property @property
def asyncio_lock(self): def asyncio_lock(self):
...@@ -86,43 +89,18 @@ class DataGenInfoPool: ...@@ -86,43 +89,18 @@ class DataGenInfoPool:
episode (EpisodeData): episode to add episode (EpisodeData): episode to add
""" """
ep_grp = episode.data ep_grp = episode.data
eef_name = list(self.env.cfg.subtask_configs.keys())[0]
# extract datagen info # extract datagen info
if "datagen_info" in ep_grp["obs"]: if "datagen_info" in ep_grp["obs"]:
eef_pose = ep_grp["obs"]["datagen_info"]["eef_pose"][eef_name] eef_pose = ep_grp["obs"]["datagen_info"]["eef_pose"]
object_poses_dict = ep_grp["obs"]["datagen_info"]["object_pose"] object_poses_dict = ep_grp["obs"]["datagen_info"]["object_pose"]
target_eef_pose = ep_grp["obs"]["datagen_info"]["target_eef_pose"][eef_name] target_eef_pose = ep_grp["obs"]["datagen_info"]["target_eef_pose"]
subtask_term_signals_dict = ep_grp["obs"]["datagen_info"]["subtask_term_signals"] subtask_term_signals_dict = ep_grp["obs"]["datagen_info"]["subtask_term_signals"]
else: else:
# Extract eef poses raise ValueError("Episode to be loaded to DatagenInfo pool lacks datagen_info annotations")
eef_pos = ep_grp["obs"]["eef_pos"]
eef_quat = ep_grp["obs"]["eef_quat"] # format (w, x, y, z)
eef_rot_matrices = PoseUtils.matrix_from_quat(eef_quat) # shape (N, 3, 3)
# Create pose matrices for all environments
eef_pose = PoseUtils.make_pose(eef_pos, eef_rot_matrices) # shape (N, 4, 4)
# Object poses
object_poses_dict = dict()
for object_name, value in ep_grp["obs"]["object_pose"].items():
# object_pose
value = value["root_pose"]
# Root state ``[pos, quat, lin_vel, ang_vel]`` in simulation world frame. Shape is (num_steps, 13).
# Quaternion ordering is wxyz
# Convert to rotation matrices
object_rot_matrices = PoseUtils.matrix_from_quat(value[:, 3:7]) # shape (N, 3, 3)
object_rot_positions = value[:, 0:3] # shape (N, 3)
object_poses_dict[object_name] = PoseUtils.make_pose(object_rot_positions, object_rot_matrices)
# Target eef pose
target_eef_pose = ep_grp["obs"]["target_eef_pose"]
# Subtask termination signalsS
subtask_term_signals_dict = (ep_grp["obs"]["subtask_term_signals"],)
# Extract gripper actions # Extract gripper actions
gripper_actions = self.env.actions_to_gripper_actions(ep_grp["actions"])[eef_name] gripper_actions = self.env.actions_to_gripper_actions(ep_grp["actions"])
ep_datagen_info_obj = DatagenInfo( ep_datagen_info_obj = DatagenInfo(
eef_pose=eef_pose, eef_pose=eef_pose,
...@@ -133,21 +111,32 @@ class DataGenInfoPool: ...@@ -133,21 +111,32 @@ class DataGenInfoPool:
) )
self._datagen_infos.append(ep_datagen_info_obj) self._datagen_infos.append(ep_datagen_info_obj)
# parse subtask indices using subtask termination signals # parse subtask ranges using subtask termination signals and store
ep_subtask_indices = [] # the start and end indices of each subtask for each eef
for eef_name in self.subtask_term_signal_names.keys():
if eef_name not in self._subtask_boundaries:
self._subtask_boundaries[eef_name] = []
prev_subtask_term_ind = 0 prev_subtask_term_ind = 0
for subtask_ind in range(len(self.subtask_term_signals)): eef_subtask_boundaries = []
subtask_term_signal = self.subtask_term_signals[subtask_ind] for subtask_term_signal_name in self.subtask_term_signal_names[eef_name]:
if subtask_term_signal is None: if subtask_term_signal_name is None:
# final subtask, finishes at end of demo # None refers to the final subtask, so finishes at end of demo
subtask_term_ind = ep_grp["actions"].shape[0] subtask_term_ind = ep_grp["actions"].shape[0]
else: else:
# trick to detect index where first 0 -> 1 transition occurs - this will be the end of the subtask # trick to detect index where first 0 -> 1 transition occurs - this will be the end of the subtask
subtask_indicators = ep_datagen_info_obj.subtask_term_signals[subtask_term_signal].flatten().int() subtask_indicators = (
ep_datagen_info_obj.subtask_term_signals[subtask_term_signal_name].flatten().int()
)
diffs = subtask_indicators[1:] - subtask_indicators[:-1] diffs = subtask_indicators[1:] - subtask_indicators[:-1]
end_ind = int(diffs.nonzero()[0][0]) + 1 end_ind = int(diffs.nonzero()[0][0]) + 1
subtask_term_ind = end_ind + 1 # increment to support indexing like demo[start:end] subtask_term_ind = end_ind + 1 # increment to support indexing like demo[start:end]
ep_subtask_indices.append([prev_subtask_term_ind, subtask_term_ind])
if subtask_term_ind <= prev_subtask_term_ind:
raise ValueError(
f"subtask termination signal is not increasing: {subtask_term_ind} should be greater than"
f" {prev_subtask_term_ind}"
)
eef_subtask_boundaries.append((prev_subtask_term_ind, subtask_term_ind))
prev_subtask_term_ind = subtask_term_ind prev_subtask_term_ind = subtask_term_ind
# run sanity check on subtask_term_offset_range in task spec to make sure we can never # run sanity check on subtask_term_offset_range in task spec to make sure we can never
...@@ -155,29 +144,26 @@ class DataGenInfoPool: ...@@ -155,29 +144,26 @@ class DataGenInfoPool:
# #
# end index of subtask i + max offset of subtask i < end index of subtask i + 1 + min offset of subtask i + 1 # end index of subtask i + max offset of subtask i < end index of subtask i + 1 + min offset of subtask i + 1
# #
assert len(ep_subtask_indices) == len( for i in range(1, len(eef_subtask_boundaries)):
self.subtask_term_signals prev_max_offset_range = self.subtask_term_offset_ranges[eef_name][i - 1][1]
), "mismatch in length of extracted subtask info and number of subtasks"
for i in range(1, len(ep_subtask_indices)):
prev_max_offset_range = self.subtask_term_offset_ranges[i - 1][1]
assert ( assert (
ep_subtask_indices[i - 1][1] + prev_max_offset_range eef_subtask_boundaries[i - 1][1] + prev_max_offset_range
< ep_subtask_indices[i][1] + self.subtask_term_offset_ranges[i][0] < eef_subtask_boundaries[i][1] + self.subtask_term_offset_ranges[eef_name][i][0]
), ( ), (
"subtask sanity check violation in demo with subtask {} end ind {}, subtask {} max offset {}," "subtask sanity check violation in demo with subtask {} end ind {}, subtask {} max offset {},"
" subtask {} end ind {}, and subtask {} min offset {}".format( " subtask {} end ind {}, and subtask {} min offset {}".format(
i - 1, i - 1,
ep_subtask_indices[i - 1][1], eef_subtask_boundaries[i - 1][1],
i - 1, i - 1,
prev_max_offset_range, prev_max_offset_range,
i, i,
ep_subtask_indices[i][1], eef_subtask_boundaries[i][1],
i, i,
self.subtask_term_offset_ranges[i][0], self.subtask_term_offset_ranges[eef_name][i][0],
) )
) )
self._subtask_indices.append(ep_subtask_indices) self._subtask_boundaries[eef_name].append(eef_subtask_boundaries)
def load_from_dataset_file(self, file_path, select_demo_keys: str | None = None): def load_from_dataset_file(self, file_path, select_demo_keys: str | None = None):
""" """
......
...@@ -8,9 +8,9 @@ import contextlib ...@@ -8,9 +8,9 @@ import contextlib
import torch import torch
from typing import Any from typing import Any
from isaaclab.envs import ManagerBasedEnv from isaaclab.envs import ManagerBasedRLMimicEnv
from isaaclab.envs.mdp.recorders.recorders_cfg import ActionStateRecorderManagerCfg from isaaclab.envs.mdp.recorders.recorders_cfg import ActionStateRecorderManagerCfg
from isaaclab.managers import DatasetExportMode from isaaclab.managers import DatasetExportMode, TerminationTermCfg
from isaaclab_mimic.datagen.data_generator import DataGenerator from isaaclab_mimic.datagen.data_generator import DataGenerator
from isaaclab_mimic.datagen.datagen_info_pool import DataGenInfoPool from isaaclab_mimic.datagen.datagen_info_pool import DataGenInfoPool
...@@ -24,23 +24,32 @@ num_attempts = 0 ...@@ -24,23 +24,32 @@ num_attempts = 0
async def run_data_generator( async def run_data_generator(
env: ManagerBasedEnv, env: ManagerBasedRLMimicEnv,
env_id: int, env_id: int,
env_reset_queue: asyncio.Queue,
env_action_queue: asyncio.Queue, env_action_queue: asyncio.Queue,
data_generator: DataGenerator, data_generator: DataGenerator,
success_term: Any, success_term: TerminationTermCfg,
pause_subtask: bool = False, pause_subtask: bool = False,
): ):
"""Run data generator.""" """Run mimic data generation from the given data generator in the specified environment index.
Args:
env: The environment to run the data generator on.
env_id: The environment index to run the data generation on.
env_reset_queue: The asyncio queue to send environment (for this particular env_id) reset requests to.
env_action_queue: The asyncio queue to send actions to for executing actions.
data_generator: The data generator instance to use.
success_term: The success termination term to use.
pause_subtask: Whether to pause the subtask during generation.
"""
global num_success, num_failures, num_attempts global num_success, num_failures, num_attempts
while True: while True:
results = await data_generator.generate( results = await data_generator.generate(
env_id=env_id, env_id=env_id,
success_term=success_term, success_term=success_term,
env_reset_queue=env_reset_queue,
env_action_queue=env_action_queue, env_action_queue=env_action_queue,
select_src_per_subtask=env.unwrapped.cfg.datagen_config.generation_select_src_per_subtask,
transform_first_robot_pose=env.unwrapped.cfg.datagen_config.generation_transform_first_robot_pose,
interpolate_from_last_target_pose=env.unwrapped.cfg.datagen_config.generation_interpolate_from_last_target_pose,
pause_subtask=pause_subtask, pause_subtask=pause_subtask,
) )
if bool(results["success"]): if bool(results["success"]):
...@@ -51,22 +60,40 @@ async def run_data_generator( ...@@ -51,22 +60,40 @@ async def run_data_generator(
def env_loop( def env_loop(
env: ManagerBasedEnv, env: ManagerBasedRLMimicEnv,
env_reset_queue: asyncio.Queue,
env_action_queue: asyncio.Queue, env_action_queue: asyncio.Queue,
shared_datagen_info_pool: DataGenInfoPool, shared_datagen_info_pool: DataGenInfoPool,
asyncio_event_loop: asyncio.AbstractEventLoop, asyncio_event_loop: asyncio.AbstractEventLoop,
) -> None: ):
"""Main loop for the environment.""" """Main asyncio loop for the environment.
Args:
env: The environment to run the main step loop on.
env_reset_queue: The asyncio queue to handle reset request the environment.
env_action_queue: The asyncio queue to handle actions to for executing actions.
shared_datagen_info_pool: The shared datagen info pool that stores source demo info.
asyncio_event_loop: The main asyncio event loop.
"""
global num_success, num_failures, num_attempts global num_success, num_failures, num_attempts
env_id_tensor = torch.tensor([0], dtype=torch.int64, device=env.device)
prev_num_attempts = 0 prev_num_attempts = 0
# simulate environment -- run everything in inference mode # simulate environment -- run everything in inference mode
with contextlib.suppress(KeyboardInterrupt) and torch.inference_mode(): with contextlib.suppress(KeyboardInterrupt) and torch.inference_mode():
while True: while True:
actions = torch.zeros(env.unwrapped.action_space.shape) # check if any environment needs to be reset while waiting for actions
while env_action_queue.qsize() != env.num_envs:
asyncio_event_loop.run_until_complete(asyncio.sleep(0))
while not env_reset_queue.empty():
env_id_tensor[0] = env_reset_queue.get_nowait()
env.reset(env_ids=env_id_tensor)
env_reset_queue.task_done()
actions = torch.zeros(env.action_space.shape)
# get actions from all the data generators # get actions from all the data generators
for i in range(env.unwrapped.num_envs): for i in range(env.num_envs):
# an async-blocking call to get an action from a data generator # an async-blocking call to get an action from a data generator
env_id, action = asyncio_event_loop.run_until_complete(env_action_queue.get()) env_id, action = asyncio_event_loop.run_until_complete(env_action_queue.get())
actions[env_id] = action actions[env_id] = action
...@@ -75,27 +102,30 @@ def env_loop( ...@@ -75,27 +102,30 @@ def env_loop(
env.step(actions) env.step(actions)
# mark done so the data generators can continue with the step results # mark done so the data generators can continue with the step results
for i in range(env.unwrapped.num_envs): for i in range(env.num_envs):
env_action_queue.task_done() env_action_queue.task_done()
if prev_num_attempts != num_attempts: if prev_num_attempts != num_attempts:
prev_num_attempts = num_attempts prev_num_attempts = num_attempts
generated_sucess_rate = 100 * num_success / num_attempts if num_attempts > 0 else 0.0
print("") print("")
print("*" * 50) print("*" * 50, "\033[K")
print(f"have {num_success} successes out of {num_attempts} trials so far") print(
print(f"have {num_failures} failures out of {num_attempts} trials so far") f"{num_success}/{num_attempts} ({generated_sucess_rate:.1f}%) successful demos generated by"
print("*" * 50) " mimic\033[K"
)
print("*" * 50, "\033[K")
# termination condition is on enough successes if @guarantee_success or enough attempts otherwise # termination condition is on enough successes if @guarantee_success or enough attempts otherwise
generation_guarantee = env.unwrapped.cfg.datagen_config.generation_guarantee generation_guarantee = env.cfg.datagen_config.generation_guarantee
generation_num_trials = env.unwrapped.cfg.datagen_config.generation_num_trials generation_num_trials = env.cfg.datagen_config.generation_num_trials
check_val = num_success if generation_guarantee else num_attempts check_val = num_success if generation_guarantee else num_attempts
if check_val >= generation_num_trials: if check_val >= generation_num_trials:
print(f"Reached {generation_num_trials} successes/attempts. Exiting.") print(f"Reached {generation_num_trials} successes/attempts. Exiting.")
break break
# check that simulation is stopped or not # check that simulation is stopped or not
if env.unwrapped.sim.is_stopped(): if env.sim.is_stopped():
break break
env.close() env.close()
...@@ -175,26 +205,28 @@ def setup_async_generation( ...@@ -175,26 +205,28 @@ def setup_async_generation(
List of asyncio tasks for data generation List of asyncio tasks for data generation
""" """
asyncio_event_loop = asyncio.get_event_loop() asyncio_event_loop = asyncio.get_event_loop()
env_reset_queue = asyncio.Queue()
env_action_queue = asyncio.Queue() env_action_queue = asyncio.Queue()
shared_datagen_info_pool_lock = asyncio.Lock() shared_datagen_info_pool_lock = asyncio.Lock()
shared_datagen_info_pool = DataGenInfoPool( shared_datagen_info_pool = DataGenInfoPool(env, env.cfg, env.device, asyncio_lock=shared_datagen_info_pool_lock)
env.unwrapped, env.unwrapped.cfg, env.unwrapped.device, asyncio_lock=shared_datagen_info_pool_lock
)
shared_datagen_info_pool.load_from_dataset_file(input_file) shared_datagen_info_pool.load_from_dataset_file(input_file)
print(f"Loaded {shared_datagen_info_pool.num_datagen_infos} to datagen info pool") print(f"Loaded {shared_datagen_info_pool.num_datagen_infos} to datagen info pool")
# Create and schedule data generator tasks # Create and schedule data generator tasks
data_generator = DataGenerator(env=env.unwrapped, src_demo_datagen_info_pool=shared_datagen_info_pool) data_generator = DataGenerator(env=env, src_demo_datagen_info_pool=shared_datagen_info_pool)
data_generator_asyncio_tasks = [] data_generator_asyncio_tasks = []
for i in range(num_envs): for i in range(num_envs):
task = asyncio_event_loop.create_task( task = asyncio_event_loop.create_task(
run_data_generator(env, i, env_action_queue, data_generator, success_term, pause_subtask=pause_subtask) run_data_generator(
env, i, env_reset_queue, env_action_queue, data_generator, success_term, pause_subtask=pause_subtask
)
) )
data_generator_asyncio_tasks.append(task) data_generator_asyncio_tasks.append(task)
return { return {
"tasks": data_generator_asyncio_tasks, "tasks": data_generator_asyncio_tasks,
"event_loop": asyncio_event_loop, "event_loop": asyncio_event_loop,
"reset_queue": env_reset_queue,
"action_queue": env_action_queue, "action_queue": env_action_queue,
"info_pool": shared_datagen_info_pool, "info_pool": shared_datagen_info_pool,
} }
...@@ -7,10 +7,13 @@ ...@@ -7,10 +7,13 @@
A collection of classes used to represent waypoints and trajectories. A collection of classes used to represent waypoints and trajectories.
""" """
import asyncio import asyncio
import inspect
import torch import torch
from copy import deepcopy from copy import deepcopy
import isaaclab.utils.math as PoseUtils import isaaclab.utils.math as PoseUtils
from isaaclab.envs import ManagerBasedRLMimicEnv
from isaaclab.managers import TerminationTermCfg
class Waypoint: class Waypoint:
...@@ -18,7 +21,7 @@ class Waypoint: ...@@ -18,7 +21,7 @@ class Waypoint:
Represents a single desired 6-DoF waypoint, along with corresponding gripper actuation for this point. Represents a single desired 6-DoF waypoint, along with corresponding gripper actuation for this point.
""" """
def __init__(self, eef_names, pose, gripper_action, noise=None): def __init__(self, pose, gripper_action, noise=None):
""" """
Args: Args:
pose (torch.Tensor): 4x4 pose target for robot controller pose (torch.Tensor): 4x4 pose target for robot controller
...@@ -26,7 +29,6 @@ class Waypoint: ...@@ -26,7 +29,6 @@ class Waypoint:
noise (float or None): action noise amplitude to apply during execution at this timestep noise (float or None): action noise amplitude to apply during execution at this timestep
(for arm actions, not gripper actions) (for arm actions, not gripper actions)
""" """
self.eef_names = eef_names
self.pose = pose self.pose = pose
self.gripper_action = gripper_action self.gripper_action = gripper_action
self.noise = noise self.noise = noise
...@@ -54,7 +56,7 @@ class WaypointSequence: ...@@ -54,7 +56,7 @@ class WaypointSequence:
self.sequence = deepcopy(sequence) self.sequence = deepcopy(sequence)
@classmethod @classmethod
def from_poses(cls, eef_names, poses, gripper_actions, action_noise): def from_poses(cls, poses, gripper_actions, action_noise):
""" """
Instantiate a WaypointSequence object given a sequence of poses, Instantiate a WaypointSequence object given a sequence of poses,
gripper actions, and action noise. gripper actions, and action noise.
...@@ -79,7 +81,6 @@ class WaypointSequence: ...@@ -79,7 +81,6 @@ class WaypointSequence:
# make WaypointSequence instance # make WaypointSequence instance
sequence = [ sequence = [
Waypoint( Waypoint(
eef_names=eef_names,
pose=poses[t], pose=poses[t],
gripper_action=gripper_actions[t], gripper_action=gripper_actions[t],
noise=action_noise[t, 0], noise=action_noise[t, 0],
...@@ -202,7 +203,6 @@ class WaypointTrajectory: ...@@ -202,7 +203,6 @@ class WaypointTrajectory:
def add_waypoint_sequence_for_target_pose( def add_waypoint_sequence_for_target_pose(
self, self,
eef_names,
pose, pose,
gripper_action, gripper_action,
num_steps, num_steps,
...@@ -254,7 +254,6 @@ class WaypointTrajectory: ...@@ -254,7 +254,6 @@ class WaypointTrajectory:
# add waypoint sequence for this set of poses # add waypoint sequence for this set of poses
sequence = WaypointSequence.from_poses( sequence = WaypointSequence.from_poses(
eef_names=eef_names,
poses=poses, poses=poses,
gripper_actions=gripper_actions, gripper_actions=gripper_actions,
action_noise=action_noise, action_noise=action_noise,
...@@ -281,7 +280,6 @@ class WaypointTrajectory: ...@@ -281,7 +280,6 @@ class WaypointTrajectory:
def merge( def merge(
self, self,
other, other,
eef_names,
num_steps_interp=None, num_steps_interp=None,
num_steps_fixed=None, num_steps_fixed=None,
action_noise=0.0, action_noise=0.0,
...@@ -315,7 +313,6 @@ class WaypointTrajectory: ...@@ -315,7 +313,6 @@ class WaypointTrajectory:
if need_interp: if need_interp:
# interpolation segment # interpolation segment
self.add_waypoint_sequence_for_target_pose( self.add_waypoint_sequence_for_target_pose(
eef_names=eef_names,
pose=target_for_interpolation.pose, pose=target_for_interpolation.pose,
gripper_action=target_for_interpolation.gripper_action, gripper_action=target_for_interpolation.gripper_action,
num_steps=num_steps_interp, num_steps=num_steps_interp,
...@@ -329,7 +326,6 @@ class WaypointTrajectory: ...@@ -329,7 +326,6 @@ class WaypointTrajectory:
# account for the fact that we pop'd the first element of @other in anticipation of an interpolation segment # account for the fact that we pop'd the first element of @other in anticipation of an interpolation segment
num_steps_fixed_to_use = num_steps_fixed if need_interp else (num_steps_fixed + 1) num_steps_fixed_to_use = num_steps_fixed if need_interp else (num_steps_fixed + 1)
self.add_waypoint_sequence_for_target_pose( self.add_waypoint_sequence_for_target_pose(
eef_names=eef_names,
pose=target_for_interpolation.pose, pose=target_for_interpolation.pose,
gripper_action=target_for_interpolation.gripper_action, gripper_action=target_for_interpolation.gripper_action,
num_steps=num_steps_fixed_to_use, num_steps=num_steps_fixed_to_use,
...@@ -343,67 +339,75 @@ class WaypointTrajectory: ...@@ -343,67 +339,75 @@ class WaypointTrajectory:
# concatenate the trajectories # concatenate the trajectories
self.waypoint_sequences += other.waypoint_sequences self.waypoint_sequences += other.waypoint_sequences
def get_full_sequence(self):
"""
Returns the full sequence of waypoints in the trajectory.
Returns:
sequence (WaypointSequence instance)
"""
return WaypointSequence(sequence=[waypoint for seq in self.waypoint_sequences for waypoint in seq.sequence])
class MultiWaypoint:
"""
A collection of Waypoint objects for multiple end effectors in the environment.
"""
def __init__(self, waypoints: dict[str, Waypoint]):
"""
Args:
waypoints (dict): a dictionary of waypionts of end effectors
"""
self.waypoints = waypoints
async def execute( async def execute(
self, self,
env, env: ManagerBasedRLMimicEnv,
env_id, success_term: TerminationTermCfg,
success_term, env_id: int = 0,
env_action_queue: asyncio.Queue | None = None, env_action_queue: asyncio.Queue | None = None,
): ):
""" """
Main function to execute the trajectory. Will use env_interface.target_eef_pose_to_action to Executes the multi-waypoint eef actions in the environment.
convert each target pose at each waypoint to an action command, and pass that along to
env.step.
Args: Args:
env (Isaac Lab ManagerBasedEnv instance): environment to use for executing trajectory env: The environment to execute the multi-waypoint actions in.
env_id (int): environment index success_term: The termination term to check for task success.
success_term: success term to check if the task is successful env_id: The environment ID to execute the multi-waypoint actions in.
env_action_queue (asyncio.Queue): queue for sending actions to the environment env_action_queue: The asyncio queue to put the action into.
Returns: Returns:
results (dict): dictionary with the following items for the executed trajectory: A dictionary containing the state, observation, action, and success of the multi-waypoint actions.
states (list): simulator state at each timestep
observations (list): observation dictionary at each timestep
datagen_infos (list): datagen_info at each timestep
actions (list): action executed at each timestep
success (bool): whether the trajectory successfully solved the task or not
""" """
# current state
states = []
actions = []
observations = []
success = False
# iterate over waypoint sequences
for seq in self.waypoint_sequences:
# iterate over waypoints in each sequence
for j in range(len(seq)):
# current waypoint
waypoint = seq[j]
# current state and observation
obs = env.obs_buf
state = env.scene.get_state(is_relative=True) state = env.scene.get_state(is_relative=True)
# convert target pose and gripper action to env action # construct action from target poses and gripper actions
target_eef_pose_dict = {waypoint.eef_names[0]: waypoint.pose} target_eef_pose_dict = {eef_name: waypoint.pose for eef_name, waypoint in self.waypoints.items()}
gripper_action_dict = {waypoint.eef_names[0]: waypoint.gripper_action} gripper_action_dict = {eef_name: waypoint.gripper_action for eef_name, waypoint in self.waypoints.items()}
if "action_noise_dict" in inspect.signature(env.target_eef_pose_to_action).parameters:
action_noise_dict = {eef_name: waypoint.noise for eef_name, waypoint in self.waypoints.items()}
play_action = env.target_eef_pose_to_action(
target_eef_pose_dict=target_eef_pose_dict,
gripper_action_dict=gripper_action_dict,
action_noise_dict=action_noise_dict,
env_id=env_id,
)
else:
# calling user-defined env.target_eef_pose_to_action() with noise parameter is deprecated
# (replaced by action_noise_dict)
play_action = env.target_eef_pose_to_action( play_action = env.target_eef_pose_to_action(
target_eef_pose_dict=target_eef_pose_dict, target_eef_pose_dict=target_eef_pose_dict,
gripper_action_dict=gripper_action_dict, gripper_action_dict=gripper_action_dict,
noise=waypoint.noise, noise=max([waypoint.noise for waypoint in self.waypoints.values()]),
env_id=env_id, env_id=env_id,
) )
# step environment if play_action.dim() == 1:
if not isinstance(play_action, torch.Tensor): play_action = play_action.unsqueeze(0) # Reshape with additional env dimension
play_action = torch.tensor(play_action)
if play_action.dim() == 1 and play_action.size(0) == 7:
play_action = play_action.unsqueeze(0) # Reshape to [1, 7]
# step environment
if env_action_queue is None: if env_action_queue is None:
obs, _, _, _, _ = env.step(play_action) obs, _, _, _, _ = env.step(play_action)
else: else:
...@@ -411,20 +415,12 @@ class WaypointTrajectory: ...@@ -411,20 +415,12 @@ class WaypointTrajectory:
await env_action_queue.join() await env_action_queue.join()
obs = env.obs_buf obs = env.obs_buf
# collect data success = bool(success_term.func(env, **success_term.params)[env_id])
states.append(state)
actions.append(play_action)
observations.append(obs)
cur_success_metric = bool(success_term.func(env, **success_term.params)[env_id])
# If the task success metric is True once during the execution, then the task is considered successful
success = success or cur_success_metric
results = dict( result = dict(
states=states, states=[state],
observations=observations, observations=[obs],
actions=torch.stack(actions), actions=[play_action],
success=success, success=success,
) )
return results return result
...@@ -59,10 +59,6 @@ class TestGenerateDataset(unittest.TestCase): ...@@ -59,10 +59,6 @@ class TestGenerateDataset(unittest.TestCase):
DATASETS_DOWNLOAD_DIR + "/dataset.hdf5", DATASETS_DOWNLOAD_DIR + "/dataset.hdf5",
"--output_file", "--output_file",
DATASETS_DOWNLOAD_DIR + "/annotated_dataset.hdf5", DATASETS_DOWNLOAD_DIR + "/annotated_dataset.hdf5",
"--signals",
"grasp_1",
"stack_1",
"grasp_2",
"--auto", "--auto",
"--headless", "--headless",
] ]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment