Unverified Commit c9997d6a authored by Özhan Özen's avatar Özhan Özen Committed by GitHub

Fixes ray lazy metric reporting and hanging processes (#2346)

# Description

The step() function of ray/tuner.py has some issues preventing one from
having an uninterrupted ray hyperparameter tuning session. Please refer
to #2328 for details.

Fixes #2328.

## Type of change

- Bug fix (non-breaking change which fixes an issue)

## Checklist

- [x] I have run the [`pre-commit` checks](https://pre-commit.com/) with
`./isaaclab.sh --format`
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have updated the changelog and the corresponding version in the
extension's `config/extension.toml` file
- [x] I have added my name to the `CONTRIBUTORS.md` or my name already
exists there
parent 82cb3206
...@@ -5,8 +5,9 @@ ...@@ -5,8 +5,9 @@
import argparse import argparse
import importlib.util import importlib.util
import os import os
import subprocess
import sys import sys
from time import sleep from time import sleep, time
import ray import ray
import util import util
...@@ -57,6 +58,9 @@ BASE_DIR = os.path.expanduser("~") ...@@ -57,6 +58,9 @@ BASE_DIR = os.path.expanduser("~")
PYTHON_EXEC = "./isaaclab.sh -p" PYTHON_EXEC = "./isaaclab.sh -p"
WORKFLOW = "scripts/reinforcement_learning/rl_games/train.py" WORKFLOW = "scripts/reinforcement_learning/rl_games/train.py"
NUM_WORKERS_PER_NODE = 1 # needed for local parallelism NUM_WORKERS_PER_NODE = 1 # needed for local parallelism
PROCESS_RESPONSE_TIMEOUT = 200.0 # seconds to wait before killing the process when it stops responding
MAX_LINES_TO_SEARCH_EXPERIMENT_LOGS = 1000 # maximum number of lines to read from the training process logs
MAX_LOG_EXTRACTION_ERRORS = 2 # maximum allowed LogExtractionErrors before we abort the whole training
class IsaacLabTuneTrainable(tune.Trainable): class IsaacLabTuneTrainable(tune.Trainable):
...@@ -70,6 +74,7 @@ class IsaacLabTuneTrainable(tune.Trainable): ...@@ -70,6 +74,7 @@ class IsaacLabTuneTrainable(tune.Trainable):
def setup(self, config: dict) -> None: def setup(self, config: dict) -> None:
"""Get the invocation command, return quick for easy scheduling.""" """Get the invocation command, return quick for easy scheduling."""
self.data = None self.data = None
self.time_since_last_proc_response = 0.0
self.invoke_cmd = util.get_invocation_command_from_cfg(cfg=config, python_cmd=PYTHON_EXEC, workflow=WORKFLOW) self.invoke_cmd = util.get_invocation_command_from_cfg(cfg=config, python_cmd=PYTHON_EXEC, workflow=WORKFLOW)
print(f"[INFO]: Recovered invocation with {self.invoke_cmd}") print(f"[INFO]: Recovered invocation with {self.invoke_cmd}")
self.experiment = None self.experiment = None
...@@ -84,12 +89,21 @@ class IsaacLabTuneTrainable(tune.Trainable): ...@@ -84,12 +89,21 @@ class IsaacLabTuneTrainable(tune.Trainable):
# When including this as first step instead of setup, experiments get scheduled faster # When including this as first step instead of setup, experiments get scheduled faster
# Don't want to block the scheduler while the experiment spins up # Don't want to block the scheduler while the experiment spins up
print(f"[INFO]: Invoking experiment as first step with {self.invoke_cmd}...") print(f"[INFO]: Invoking experiment as first step with {self.invoke_cmd}...")
experiment = util.execute_job( try:
self.invoke_cmd, experiment = util.execute_job(
identifier_string="", self.invoke_cmd,
extract_experiment=True, identifier_string="",
persistent_dir=BASE_DIR, extract_experiment=True, # Keep this as True to return a valid dictionary
) persistent_dir=BASE_DIR,
max_lines_to_search_logs=MAX_LINES_TO_SEARCH_EXPERIMENT_LOGS,
max_time_to_search_logs=PROCESS_RESPONSE_TIMEOUT,
)
except util.LogExtractionError:
self.data = {
"LOG_EXTRACTION_ERROR_STOPPER_FLAG": True,
"done": True,
}
return self.data
self.experiment = experiment self.experiment = experiment
print(f"[INFO]: Tuner recovered experiment info {experiment}") print(f"[INFO]: Tuner recovered experiment info {experiment}")
self.proc = experiment["proc"] self.proc = experiment["proc"]
...@@ -109,11 +123,35 @@ class IsaacLabTuneTrainable(tune.Trainable): ...@@ -109,11 +123,35 @@ class IsaacLabTuneTrainable(tune.Trainable):
while data is None: while data is None:
data = util.load_tensorboard_logs(self.tensorboard_logdir) data = util.load_tensorboard_logs(self.tensorboard_logdir)
proc_status = self.proc.poll()
if proc_status is not None:
break
sleep(2) # Lazy report metrics to avoid performance overhead sleep(2) # Lazy report metrics to avoid performance overhead
if self.data is not None: if self.data is not None:
while util._dicts_equal(data, self.data): data_ = {k: v for k, v in data.items() if k != "done"}
self_data_ = {k: v for k, v in self.data.items() if k != "done"}
unresponsiveness_start_time = time()
while util._dicts_equal(data_, self_data_):
self.time_since_last_proc_response = time() - unresponsiveness_start_time
data = util.load_tensorboard_logs(self.tensorboard_logdir) data = util.load_tensorboard_logs(self.tensorboard_logdir)
data_ = {k: v for k, v in data.items() if k != "done"}
proc_status = self.proc.poll()
if proc_status is not None:
break
if self.time_since_last_proc_response > PROCESS_RESPONSE_TIMEOUT:
self.time_since_last_proc_response = 0.0
print("[WARNING]: Training workflow process is not responding, terminating...")
self.proc.terminate()
try:
self.proc.wait(timeout=20)
except subprocess.TimeoutExpired:
print("[ERROR]: The process did not terminate within timeout duration.")
self.proc.kill()
self.proc.wait()
self.data = data
self.data["done"] = True
return self.data
sleep(2) # Lazy report metrics to avoid performance overhead sleep(2) # Lazy report metrics to avoid performance overhead
self.data = data self.data = data
...@@ -132,6 +170,39 @@ class IsaacLabTuneTrainable(tune.Trainable): ...@@ -132,6 +170,39 @@ class IsaacLabTuneTrainable(tune.Trainable):
) )
class LogExtractionErrorStopper(tune.Stopper):
"""Stopper that stops all trials if multiple LogExtractionErrors occur.
Args:
max_errors: The maximum number of LogExtractionErrors allowed before terminating the experiment.
"""
def __init__(self, max_errors: int):
self.max_errors = max_errors
self.error_count = 0
def __call__(self, trial_id, result):
"""Increments the error count if trial has encountered a LogExtractionError.
It does not stop the trial based on the metrics, always returning False.
"""
if result.get("LOG_EXTRACTION_ERROR_STOPPER_FLAG", False):
self.error_count += 1
print(
f"[ERROR]: Encountered LogExtractionError {self.error_count} times. "
f"Maximum allowed is {self.max_errors}."
)
return False
def stop_all(self):
"""Returns true if number of LogExtractionErrors exceeds the maximum allowed, terminating the experiment."""
if self.error_count > self.max_errors:
print("[FATAL]: Encountered LogExtractionError more than allowed, aborting entire tuning run... ")
return True
else:
return False
def invoke_tuning_run(cfg: dict, args: argparse.Namespace) -> None: def invoke_tuning_run(cfg: dict, args: argparse.Namespace) -> None:
"""Invoke an Isaac-Ray tuning run. """Invoke an Isaac-Ray tuning run.
...@@ -175,6 +246,7 @@ def invoke_tuning_run(cfg: dict, args: argparse.Namespace) -> None: ...@@ -175,6 +246,7 @@ def invoke_tuning_run(cfg: dict, args: argparse.Namespace) -> None:
checkpoint_frequency=0, # Disable periodic checkpointing checkpoint_frequency=0, # Disable periodic checkpointing
checkpoint_at_end=False, # Disable final checkpoint checkpoint_at_end=False, # Disable final checkpoint
), ),
stop=LogExtractionErrorStopper(max_errors=MAX_LOG_EXTRACTION_ERRORS),
) )
elif args.run_mode == "remote": # MLFlow, to MLFlow server elif args.run_mode == "remote": # MLFlow, to MLFlow server
...@@ -190,6 +262,7 @@ def invoke_tuning_run(cfg: dict, args: argparse.Namespace) -> None: ...@@ -190,6 +262,7 @@ def invoke_tuning_run(cfg: dict, args: argparse.Namespace) -> None:
storage_path="/tmp/ray", storage_path="/tmp/ray",
callbacks=[mlflow_callback], callbacks=[mlflow_callback],
checkpoint_config=ray.train.CheckpointConfig(checkpoint_frequency=0, checkpoint_at_end=False), checkpoint_config=ray.train.CheckpointConfig(checkpoint_frequency=0, checkpoint_at_end=False),
stop=LogExtractionErrorStopper(max_errors=MAX_LOG_EXTRACTION_ERRORS),
) )
else: else:
raise ValueError("Unrecognized run mode.") raise ValueError("Unrecognized run mode.")
...@@ -199,6 +272,8 @@ def invoke_tuning_run(cfg: dict, args: argparse.Namespace) -> None: ...@@ -199,6 +272,8 @@ def invoke_tuning_run(cfg: dict, args: argparse.Namespace) -> None:
IsaacLabTuneTrainable, IsaacLabTuneTrainable,
param_space=cfg, param_space=cfg,
tune_config=tune.TuneConfig( tune_config=tune.TuneConfig(
metric=args.metric,
mode=args.mode,
search_alg=repeat_search, search_alg=repeat_search,
num_samples=args.num_samples, num_samples=args.num_samples,
reuse_actors=True, reuse_actors=True,
...@@ -306,8 +381,39 @@ if __name__ == "__main__": ...@@ -306,8 +381,39 @@ if __name__ == "__main__":
default=3, default=3,
help="How many times to repeat each hyperparameter config.", help="How many times to repeat each hyperparameter config.",
) )
parser.add_argument(
"--process_response_timeout",
type=float,
default=PROCESS_RESPONSE_TIMEOUT,
help="Training workflow process response timeout.",
)
parser.add_argument(
"--max_lines_to_search_experiment_logs",
type=float,
default=MAX_LINES_TO_SEARCH_EXPERIMENT_LOGS,
help="Max number of lines to search for experiment logs before terminating the training workflow process.",
)
parser.add_argument(
"--max_log_extraction_errors",
type=float,
default=MAX_LOG_EXTRACTION_ERRORS,
help="Max number number of LogExtractionError failures before we abort the whole tuning run.",
)
args = parser.parse_args() args = parser.parse_args()
PROCESS_RESPONSE_TIMEOUT = args.process_response_timeout
MAX_LINES_TO_SEARCH_EXPERIMENT_LOGS = int(args.max_lines_to_search_experiment_logs)
print(
"[INFO]: The max number of lines to search for experiment logs before (early) terminating the training "
f"workflow process is set to {MAX_LINES_TO_SEARCH_EXPERIMENT_LOGS}.\n"
"[INFO]: The process response timeout, used while updating tensorboard scalars and searching for "
f"experiment logs, is set to {PROCESS_RESPONSE_TIMEOUT} seconds."
)
MAX_LOG_EXTRACTION_ERRORS = int(args.max_log_extraction_errors)
print(
"[INFO]: Max number of LogExtractionError failures before we abort the whole tuning run is "
f"set to {MAX_LOG_EXTRACTION_ERRORS}.\n"
)
NUM_WORKERS_PER_NODE = args.num_workers_per_node NUM_WORKERS_PER_NODE = args.num_workers_per_node
print(f"[INFO]: Using {NUM_WORKERS_PER_NODE} workers per node.") print(f"[INFO]: Using {NUM_WORKERS_PER_NODE} workers per node.")
if args.run_mode == "remote": if args.run_mode == "remote":
......
...@@ -5,10 +5,12 @@ ...@@ -5,10 +5,12 @@
import argparse import argparse
import os import os
import re import re
import select
import subprocess import subprocess
import threading import threading
from datetime import datetime from datetime import datetime
from math import isclose from math import isclose
from time import time
import ray import ray
from tensorboard.backend.event_processing.directory_watcher import DirectoryDeletedError from tensorboard.backend.event_processing.directory_watcher import DirectoryDeletedError
...@@ -26,6 +28,12 @@ def load_tensorboard_logs(directory: str) -> dict: ...@@ -26,6 +28,12 @@ def load_tensorboard_logs(directory: str) -> dict:
The latest available scalar values. The latest available scalar values.
""" """
# replace any non-alnum/underscore/dot with "_", then collapse runs of "_"
def replace_invalid_chars(t):
t2 = re.sub(r"[^0-9A-Za-z_./]", "_", t)
t2 = re.sub(r"_+", "_", t2)
return t2.strip("_")
# Initialize the event accumulator with a size guidance for only the latest entry # Initialize the event accumulator with a size guidance for only the latest entry
def get_latest_scalars(path: str) -> dict: def get_latest_scalars(path: str) -> dict:
event_acc = EventAccumulator(path, size_guidance={"scalars": 1}) event_acc = EventAccumulator(path, size_guidance={"scalars": 1})
...@@ -33,7 +41,7 @@ def load_tensorboard_logs(directory: str) -> dict: ...@@ -33,7 +41,7 @@ def load_tensorboard_logs(directory: str) -> dict:
event_acc.Reload() event_acc.Reload()
if event_acc.Tags()["scalars"]: if event_acc.Tags()["scalars"]:
return { return {
tag: event_acc.Scalars(tag)[-1].value replace_invalid_chars(tag): event_acc.Scalars(tag)[-1].value
for tag in event_acc.Tags()["scalars"] for tag in event_acc.Tags()["scalars"]
if event_acc.Scalars(tag) if event_acc.Scalars(tag)
} }
...@@ -98,6 +106,12 @@ def remote_execute_job( ...@@ -98,6 +106,12 @@ def remote_execute_job(
) )
class LogExtractionError(Exception):
"""Raised when we cannot extract experiment_name/logdir from the trainer output."""
pass
def execute_job( def execute_job(
job_cmd: str, job_cmd: str,
identifier_string: str = "job 0", identifier_string: str = "job 0",
...@@ -105,6 +119,8 @@ def execute_job( ...@@ -105,6 +119,8 @@ def execute_job(
extract_experiment: bool = False, extract_experiment: bool = False,
persistent_dir: str | None = None, persistent_dir: str | None = None,
log_all_output: bool = False, log_all_output: bool = False,
max_lines_to_search_logs: int = 1000,
max_time_to_search_logs: float = 200.0,
) -> str | dict: ) -> str | dict:
"""Issue a job (shell command). """Issue a job (shell command).
...@@ -117,6 +133,8 @@ def execute_job( ...@@ -117,6 +133,8 @@ def execute_job(
persistent_dir: When supplied, change to run the directory in a persistent persistent_dir: When supplied, change to run the directory in a persistent
directory. Can be used to avoid losing logs in the /tmp directory. Defaults to None. directory. Can be used to avoid losing logs in the /tmp directory. Defaults to None.
log_all_output: When true, print all output to the console. Defaults to False. log_all_output: When true, print all output to the console. Defaults to False.
max_lines_to_search_logs: Maximum number of lines to search for experiment info. Defaults to 1000.
max_time_to_search_logs: Maximum time to wait for experiment info before giving up. Defaults to 200.0 seconds.
Raises: Raises:
ValueError: If the job is unable to start, or throws an error. Most likely to happen ValueError: If the job is unable to start, or throws an error. Most likely to happen
due to running out of memory. due to running out of memory.
...@@ -190,6 +208,8 @@ def execute_job( ...@@ -190,6 +208,8 @@ def execute_job(
process = subprocess.Popen( process = subprocess.Popen(
job_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1 job_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1
) )
process_file_descriptor = process.stdout.fileno()
if persistent_dir: if persistent_dir:
os.chdir(og_dir) os.chdir(og_dir)
experiment_name = None experiment_name = None
...@@ -205,48 +225,80 @@ def execute_job( ...@@ -205,48 +225,80 @@ def execute_job(
if log_all_output: if log_all_output:
print(f"{identifier_string}: {line}") print(f"{identifier_string}: {line}")
# Read stdout until we find experiment info # Read stdout until we find exp. info, up to max_lines_to_search_logs lines, max_time_to_search_logs, or EOF.
# Do some careful handling prevent overflowing the pipe reading buffer with error 141 # Do some careful handling prevent overflowing the pipe reading buffer with error 141
for line in iter(process.stdout.readline, ""): lines_read = 0
line = line.strip() search_duration = 0.0
result_details.append(f"{identifier_string}: {line} \n") search_start_time = time()
if log_all_output: while True:
print(f"{identifier_string}: {line}") new_line_ready, _, _ = select.select([process_file_descriptor], [], [], 1.0) # Wait up to 1s for stdout
if new_line_ready:
if extract_experiment: line = process.stdout.readline()
exp_match = experiment_info_pattern.search(line) if not line: # EOF
log_match = logdir_pattern.search(line) break
err_match = err_pattern.search(line)
lines_read += 1
if err_match: line = line.strip()
raise ValueError(f"Encountered an error during trial run. {' '.join(result_details)}") result_details.append(f"{identifier_string}: {line} \n")
if exp_match: if log_all_output:
experiment_name = exp_match.group(1) print(f"{identifier_string}: {line}")
if log_match:
logdir = log_match.group(1) if extract_experiment:
exp_match = experiment_info_pattern.search(line)
if experiment_name and logdir: log_match = logdir_pattern.search(line)
# Start stderr reader after finding experiment info err_match = err_pattern.search(line)
stderr_thread = threading.Thread(
target=stream_reader, args=(process.stderr, identifier_string, result_details) if err_match:
) raise ValueError(f"Encountered an error during trial run. {' '.join(result_details)}")
stderr_thread.daemon = True
stderr_thread.start() if exp_match:
experiment_name = exp_match.group(1)
# Start stdout reader to continue reading to flush buffer if log_match:
stdout_thread = threading.Thread( logdir = log_match.group(1)
target=stream_reader, args=(process.stdout, identifier_string, result_details)
) if experiment_name and logdir:
stdout_thread.daemon = True # Start stderr reader after finding experiment info
stdout_thread.start() stderr_thread = threading.Thread(
target=stream_reader, args=(process.stderr, identifier_string, result_details)
return { )
"experiment_name": experiment_name, stderr_thread.daemon = True
"logdir": logdir, stderr_thread.start()
"proc": process,
"result": " ".join(result_details), # Start stdout reader to continue reading to flush buffer
} stdout_thread = threading.Thread(
target=stream_reader, args=(process.stdout, identifier_string, result_details)
)
stdout_thread.daemon = True
stdout_thread.start()
return {
"experiment_name": experiment_name,
"logdir": logdir,
"proc": process,
"result": " ".join(result_details),
}
if extract_experiment: # if we are looking for experiment info, check for timeouts and line limits
search_duration = time() - search_start_time
if search_duration > max_time_to_search_logs:
print(f"[ERROR]: Could not find experiment logs within {max_time_to_search_logs} seconds.")
break
if lines_read >= max_lines_to_search_logs:
print(f"[ERROR]: Could not find experiment logs within first {max_lines_to_search_logs} lines.")
break
# If we reach here, we didn't find experiment info in the output
if extract_experiment and not (experiment_name and logdir):
error_msg = (
"Could not extract experiment_name/logdir from trainer output "
f"(experiment_name={experiment_name!r}, logdir={logdir!r}).\n"
"\tMake sure your training script prints the following correctly:\n"
"\t\tExact experiment name requested from command line: <name>\n"
"\t\t[INFO] Logging experiment in directory: <logdir>\n\n"
)
print(f"[ERROR]: {error_msg}")
raise LogExtractionError("Could not extract experiment_name/logdir from training workflow output.")
process.wait() process.wait()
now = datetime.now().strftime("%H:%M:%S.%f") now = datetime.now().strftime("%H:%M:%S.%f")
completion_info = f"\n[INFO]: {identifier_string}: Job Started at {start_time}, completed at {now}\n" completion_info = f"\n[INFO]: {identifier_string}: Job Started at {start_time}, completed at {now}\n"
......
...@@ -131,7 +131,7 @@ def main(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg | DirectMARLEnvCfg, agen ...@@ -131,7 +131,7 @@ def main(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg | DirectMARLEnvCfg, agen
print(f"[INFO] Logging experiment in directory: {log_root_path}") print(f"[INFO] Logging experiment in directory: {log_root_path}")
# specify directory for logging runs: {time-stamp}_{run_name} # specify directory for logging runs: {time-stamp}_{run_name}
log_dir = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") log_dir = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# This way, the Ray Tune workflow can extract experiment name. # The Ray Tune workflow extracts experiment name using the logging line below, hence, do not change it (see PR #2346, comment-2819298849)
print(f"Exact experiment name requested from command line: {log_dir}") print(f"Exact experiment name requested from command line: {log_dir}")
if agent_cfg.run_name: if agent_cfg.run_name:
log_dir += f"_{agent_cfg.run_name}" log_dir += f"_{agent_cfg.run_name}"
......
...@@ -95,6 +95,7 @@ def main(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg | DirectMARLEnvCfg, agen ...@@ -95,6 +95,7 @@ def main(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg | DirectMARLEnvCfg, agen
run_info = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") run_info = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_root_path = os.path.abspath(os.path.join("logs", "sb3", args_cli.task)) log_root_path = os.path.abspath(os.path.join("logs", "sb3", args_cli.task))
print(f"[INFO] Logging experiment in directory: {log_root_path}") print(f"[INFO] Logging experiment in directory: {log_root_path}")
# The Ray Tune workflow extracts experiment name using the logging line below, hence, do not change it (see PR #2346, comment-2819298849)
print(f"Exact experiment name requested from command line: {run_info}") print(f"Exact experiment name requested from command line: {run_info}")
log_dir = os.path.join(log_root_path, run_info) log_dir = os.path.join(log_root_path, run_info)
# dump the configuration into log-directory # dump the configuration into log-directory
......
...@@ -140,7 +140,8 @@ def main(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg | DirectMARLEnvCfg, agen ...@@ -140,7 +140,8 @@ def main(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg | DirectMARLEnvCfg, agen
print(f"[INFO] Logging experiment in directory: {log_root_path}") print(f"[INFO] Logging experiment in directory: {log_root_path}")
# specify directory for logging runs: {time-stamp}_{run_name} # specify directory for logging runs: {time-stamp}_{run_name}
log_dir = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + f"_{algorithm}_{args_cli.ml_framework}" log_dir = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + f"_{algorithm}_{args_cli.ml_framework}"
print(f"Exact experiment name requested from command line {log_dir}") # The Ray Tune workflow extracts experiment name using the logging line below, hence, do not change it (see PR #2346, comment-2819298849)
print(f"Exact experiment name requested from command line: {log_dir}")
if agent_cfg["agent"]["experiment"]["experiment_name"]: if agent_cfg["agent"]["experiment"]["experiment_name"]:
log_dir += f'_{agent_cfg["agent"]["experiment"]["experiment_name"]}' log_dir += f'_{agent_cfg["agent"]["experiment"]["experiment_name"]}'
# set directory into agent config # set directory into agent config
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment