Commit d083ab36 authored by CY Chen's avatar CY Chen Committed by Kelly Guo

Replaces is_success method with success termination term in mimic (#225)

# Description

Update mimic to rely on an environment's "success" termination term to
determine if an episode ends with success.
As a result, is_success API in the mimic class `ManagerBasedRLMimicEnv`
is no longer required and hence removed.

## Type of change

<!-- As you go through the list, delete the ones that are not
applicable. -->

- Bug fix (non-breaking change which fixes an issue)

## Checklist

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./isaaclab.sh --format`
- [x] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [x] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there

<!--
As you go through the checklist above, you can mark something as done by
putting an x character in it

For example,
- [x] I have done this task
- [ ] I have not done this task
-->

---------
Co-authored-by: 's avatarKelly Guo <kellyg@nvidia.com>
parent a90ddb94
......@@ -177,8 +177,6 @@ Mimic compatible environments are derived from the :class:`~isaaclab.envs.Manage
* ``get_subtask_term_signals``: Returns a dictionary of binary flags for each subtask in a task. The flag of 1 is set when the subtask has been completed and 0 otherwise.
* ``is_success``: Returns a boolean indicator of whether the task has been successfully completed.
The class :class:`~isaaclab_mimic.envs.FrankaCubeStackIKRelMimicEnv` shows an example of creating a Mimic compatible environment from an existing Isaac Lab environment.
A Mimic compatible environment config class must also be created by extending the existing environment config with additional Mimic required parameters.
......
......@@ -21,6 +21,12 @@ parser.add_argument("--task", type=str, default=None, help="Name of the task.")
parser.add_argument(
"--num_demos", type=int, default=0, help="Number of demonstrations to record. Set to 0 for infinite."
)
parser.add_argument(
"--num_success_steps",
type=int,
default=10,
help="Number of continuous steps with task success for concluding a demo as successful. Default is 10.",
)
parser.add_argument(
"--num_envs",
type=int,
......@@ -178,7 +184,7 @@ def pre_process_actions(delta_pose: torch.Tensor, gripper_command: bool) -> torc
async def run_teleop_robot(
env, env_id, env_action_queue, shared_datagen_info_pool, exported_dataset_path, teleop_interface=None
env, env_id, env_action_queue, shared_datagen_info_pool, success_term, exported_dataset_path, teleop_interface=None
):
"""Run teleop robot."""
global num_recorded
......@@ -208,14 +214,14 @@ async def run_teleop_robot(
recorded_episode_dataset_file_handler.create(exported_dataset_path, env_name=env.unwrapped.cfg.env_name)
env_id_tensor = torch.tensor([env_id], dtype=torch.int64, device=env.device)
success_delay_step_count = 10
has_succeeded = False
success_step_count = 0
num_recorded = 0
while True:
if should_reset_teleop_instance:
env.unwrapped.recorder_manager.reset(env_id_tensor)
env.unwrapped.reset(env_ids=env_id_tensor)
should_reset_teleop_instance = False
success_step_count = 0
# get keyboard command
delta_pose, gripper_command = teleop_interface.advance()
......@@ -227,33 +233,27 @@ async def run_teleop_robot(
await env_action_queue.put((env_id, teleop_action))
await env_action_queue.join()
if has_succeeded:
if success_delay_step_count > 0:
success_delay_step_count -= 1
continue
if bool(env.is_success()[env_id]):
env.recorder_manager.set_success_to_episodes(
env_id_tensor, torch.tensor([[True]], dtype=torch.bool, device=env.device)
)
teleop_episode = env.unwrapped.recorder_manager.get_episode(env_id)
await shared_datagen_info_pool.add_episode(teleop_episode)
recorded_episode_dataset_file_handler.write_episode(teleop_episode)
recorded_episode_dataset_file_handler.flush()
env.recorder_manager.reset(env_id_tensor)
num_recorded += 1
should_reset_teleop_instance = True
if success_term is not None:
if bool(success_term.func(env, **success_term.params)[env_id]):
success_step_count += 1
if success_step_count >= args_cli.num_success_steps:
env.recorder_manager.set_success_to_episodes(
env_id_tensor, torch.tensor([[True]], dtype=torch.bool, device=env.device)
)
teleop_episode = env.unwrapped.recorder_manager.get_episode(env_id)
await shared_datagen_info_pool.add_episode(teleop_episode)
recorded_episode_dataset_file_handler.write_episode(teleop_episode)
recorded_episode_dataset_file_handler.flush()
env.recorder_manager.reset(env_id_tensor)
num_recorded += 1
should_reset_teleop_instance = True
else:
has_succeeded = False
else:
if bool(env.is_success()[env_id]):
has_succeeded = True
success_delay_step_count = 10
success_step_count = 0
async def run_data_generator(
env, env_id, env_action_queue, shared_datagen_info_pool, pause_subtask=False, export_demo=True
env, env_id, env_action_queue, shared_datagen_info_pool, success_term, pause_subtask=False, export_demo=True
):
"""Run data generator."""
global num_success, num_failures, num_attempts
......@@ -266,6 +266,7 @@ async def run_data_generator(
results = await data_generator.generate(
env_id=env_id,
success_term=success_term,
env_action_queue=env_action_queue,
select_src_per_subtask=env.unwrapped.cfg.datagen_config.generation_select_src_per_subtask,
transform_first_robot_pose=env.unwrapped.cfg.datagen_config.generation_transform_first_robot_pose,
......@@ -367,10 +368,16 @@ def main():
env_cfg = parse_env_cfg(env_name, device=args_cli.device, num_envs=num_envs)
env_cfg.env_name = env_name
# modify configuration such that the environment runs indefinitely
env_cfg.terminations.time_out = None
# extract success checking function to invoke manually
success_term = None
if hasattr(env_cfg.terminations, "success"):
success_term = env_cfg.terminations.success
env_cfg.terminations.success = None
else:
raise NotImplementedError("No success termination term was found in the environment.")
# data generator is in charge of resetting the environment
env_cfg.terminations.success = None
env_cfg.terminations = None
env_cfg.observations.policy.concatenate_terms = False
......@@ -416,14 +423,21 @@ def main():
if args_cli.teleop_env_index is not None and i == args_cli.teleop_env_index:
data_generator_asyncio_tasks.append(
asyncio_event_loop.create_task(
run_teleop_robot(env, i, env_action_queue, shared_datagen_info_pool, args_cli.output_file)
run_teleop_robot(
env, i, env_action_queue, shared_datagen_info_pool, success_term, args_cli.output_file
)
)
)
continue
data_generator_asyncio_tasks.append(
asyncio_event_loop.create_task(
run_data_generator(
env, i, env_action_queue, shared_datagen_info_pool, export_demo=bool(args_cli.generated_output_file)
env,
i,
env_action_queue,
shared_datagen_info_pool,
success_term,
export_demo=bool(args_cli.generated_output_file),
)
)
)
......
......@@ -68,12 +68,13 @@ num_failures = 0
num_attempts = 0
async def run_data_generator(env, env_id, env_action_queue, data_generator, pause_subtask=False):
async def run_data_generator(env, env_id, env_action_queue, data_generator, success_term, pause_subtask=False):
"""Run data generator."""
global num_success, num_failures, num_attempts
while True:
results = await data_generator.generate(
env_id=env_id,
success_term=success_term,
env_action_queue=env_action_queue,
select_src_per_subtask=env.unwrapped.cfg.datagen_config.generation_select_src_per_subtask,
transform_first_robot_pose=env.unwrapped.cfg.datagen_config.generation_transform_first_robot_pose,
......@@ -165,10 +166,16 @@ def main():
env_cfg.env_name = env_name
# modify configuration such that the environment runs indefinitely
env_cfg.terminations.time_out = None
# extract success checking function to invoke manually
success_term = None
if hasattr(env_cfg.terminations, "success"):
success_term = env_cfg.terminations.success
env_cfg.terminations.success = None
else:
raise NotImplementedError("No success termination term was found in the environment.")
# data generator is in charge of resetting the environment
env_cfg.terminations.success = None
env_cfg.terminations = None
env_cfg.observations.policy.concatenate_terms = False
......@@ -209,7 +216,9 @@ def main():
for i in range(num_envs):
data_generator_asyncio_tasks.append(
asyncio_event_loop.create_task(
run_data_generator(env, i, env_action_queue, data_generator, pause_subtask=args_cli.pause_subtask)
run_data_generator(
env, i, env_action_queue, data_generator, success_term, pause_subtask=args_cli.pause_subtask
)
)
)
......
......@@ -106,18 +106,6 @@ class ManagerBasedRLMimicEnv(ManagerBasedRLEnv):
"""
raise NotImplementedError
def is_success(self):
"""
Determines whether the task has succeeded based on internally defined success criteria.
This method implements the logic to evaluate the task's success by checking relevant
conditions and constraints derived from the observations in the scene.
Returns:
success (bool): True if the task is considered successful based on the defined criteria. False otherwise.
"""
raise NotImplementedError
def serialize(self):
"""
Save all information needed to re-instantiate this environment in a dictionary.
......
......@@ -188,7 +188,8 @@ class DataGenerator:
async def generate(
self,
env_id=0,
env_id,
success_term,
env_action_queue: asyncio.Queue | None = None,
select_src_per_subtask=False,
transform_first_robot_pose=False,
......@@ -201,6 +202,9 @@ class DataGenerator:
Args:
env_id (int): environment ID
success_term (TerminationTermCfg): success function to check if the task is successful
env_action_queue (asyncio.Queue): queue to store actions for each environment
select_src_per_subtask (bool): if True, select a different source demonstration for each subtask
......@@ -385,7 +389,9 @@ class DataGenerator:
traj_to_execute.pop_first()
# Execute the trajectory and collect data.
exec_results = await traj_to_execute.execute(env=self.env, env_id=env_id, env_action_queue=env_action_queue)
exec_results = await traj_to_execute.execute(
env=self.env, env_id=env_id, env_action_queue=env_action_queue, success_term=success_term
)
# check that trajectory is non-empty
if len(exec_results["states"]) > 0:
......
......@@ -340,7 +340,8 @@ class WaypointTrajectory:
async def execute(
self,
env,
env_id=0,
env_id,
success_term,
env_action_queue: asyncio.Queue | None = None,
):
"""
......@@ -351,6 +352,7 @@ class WaypointTrajectory:
Args:
env (Isaac Lab ManagerBasedEnv instance): environment to use for executing trajectory
env_id (int): environment index
success_term: success term to check if the task is successful
env_action_queue (asyncio.Queue): queue for sending actions to the environment
Returns:
......@@ -410,8 +412,7 @@ class WaypointTrajectory:
actions.append(play_action)
observations.append(obs)
# TODO: Change the termination manager to support multiple environments
cur_success_metric = bool(env.is_success()[env_id])
cur_success_metric = bool(success_term.func(env, **success_term.params)[env_id])
# If the task success metric is True once during the execution, then the task is considered successful
success = success or cur_success_metric
......
......@@ -8,8 +8,6 @@ import torch
import isaaclab.utils.math as PoseUtils
from isaaclab.envs import ManagerBasedRLMimicEnv
from isaaclab_tasks.manager_based.manipulation.stack.mdp import cubes_stacked
class FrankaCubeStackIKRelMimicEnv(ManagerBasedRLMimicEnv):
"""
......@@ -144,6 +142,3 @@ class FrankaCubeStackIKRelMimicEnv(ManagerBasedRLMimicEnv):
signals["stack_1"] = subtask_terms["stack_1"][env_ind]
# final subtask is placing cubeC on cubeA (motion relative to cubeA) - but final subtask signal is not needed
return signals
def is_success(self):
return cubes_stacked(self, atol=0.001, rtol=0.001)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment